1pip install humatron-python-sdk locked-dict flask requests
flask
dependency is optional and used for examples only.REST request | description |
---|---|
register | This request is sent when a new worker instance is hired and just before the onboarding commences. |
unregister | This request is sent when a worker instance is terminated. |
pause | This request is sent when a worker instance is temporarily paused. |
resume | This request is sent when a worker instance is resumed from paused state. |
message | This request is used to send one or more messages from the outside world to the worker instance. |
interview | A interview request is used to conduct an interview for the worker template (i.e. specialist). |
class | description |
---|---|
HumatronWorkerApi | The class mirrors the REST API, providing a single method process_request to implement. This method accepts a request object as input and returns a response object. You will need to manage the request state and handle all asynchronous logic yourself. This approach is flexible but requires additional effort. |
HumatronAsyncWorker | The class is designed to handle multiple requests concurrently in asynchronous mode. This class inherits from HumatronWorkerApi , but in the process_payload_part method it implements, the input request payload is divided into separate payload parts. This design allows each part to be processed independently without the need to manage asynchronous execution explicitly. All these response payload parts are processed in asynchronous mode. This class also provides a push_response method to push responses to the client without initiating a request. This response will be sent to the AI worker along with the next portion of responses. |
HumatronWorkerApi
implementation, the results will be returned immediately. If you use the HumatronAsyncWorker
implementation, the accumulated results will be returned with the next request.1# AI worker Example.
2from uuid import uuid4
3from humatron.worker.client import *
4from humatron.worker.utils import make_default_response_payload
5from langchain_core.output_parsers import StrOutputParser
6from langchain_core.prompts import ChatPromptTemplate
7from langchain_openai import ChatOpenAI
8
9# Define the chat prompt template.
10_prompt = ChatPromptTemplate.from_messages([('system', 'You are a helpful assistant.'), ('user', '{input}')])
11
12# Worker implementation based on `python-sdk` library.
13class HumatronWorkerChatGpt(HumatronAsyncWorker):
14 def __init__(self, openai_api_key: str):
15 super().__init__()
16 # Create the processing chain.
17 self._chain = _prompt | ChatOpenAI(openai_api_key=openai_api_key) | StrOutputParser()
18
19 # Implement the `process_payload_part` method.
20 def process_payload_part(self, rpp: RequestPayloadPart, _: Storage) -> ResponsePayloadPart:
21 # Process different types of request commands.
22 match rpp.body:
23 case RequestDataMessage(_) as data:
24 # To simplify the example, we skip the check for sending a message to oneself, etc.
25 match data.message:
26 case RequestMessageEmail(_) as email:
27 resp = self._chain.invoke({'input': email.text})
28 resp_email = ResponseMessageEmail.make(sender=email.to, to=email.sender, subj='Demo', text=resp)
29 return ResponseDataMessage.make(data.instance.id, data.resource_id, resp_email, data.payload_id)
30 case RequestMessageSms(_) as sms:
31 resp = self._chain.invoke({'input': sms.text})
32 resp_sms = ResponseMessageSms.make(sender=sms.receiver, receiver=sms.sender, text=resp)
33 return ResponseDataMessage.make(data.instance.id, data.resource_id, resp_sms, data.payload_id)
34 case RequestMessageSlack(_) as slack:
35 resp = self._chain.invoke({'input': slack.body['text']})
36 resp_slack = ResponseMessageSlack.make(channel=slack.body['channel'], text=resp)
37 return ResponseDataMessage.make(data.instance.id, data.resource_id, resp_slack, data.payload_id)
38 case _:
39 raise ValueError(f'Unexpected request: {data.message}')
40 case _:
41 # We skip all `interview`, `register`, `unregister`, `pause` and `resume` logic.
42 return make_default_response_payload(req_cmd=rpp.req_cmd, req_payload_part=rpp.body)
3
, 4
- import classes from the Python SDK library. This library provides asynchronous request handling as well as a number of utility methods.17
- initialize OpenAI LLM and create the request processing chain.13
- define the HumatronWorkerChatGpt
class, which inherits from HumatronAsyncWorker
, part of the Python SDK library.20
- implement the abstract process_payload_part
method from the HumatronAsyncWorker
ABC class.27
, 31
, 35
- send requests to the LLM, with the text field value extracted from the request body.26
- handle the message request with channel_type
equal to email
. We return an email response to the question contained in the email, sent to the sender's address.30
- handle the message request with channel_type
equal to SMS
. We return an SMS response to the question contained in the message, sent to the sender's phone number.34
- handle the message request with channel_type
equal to slack
. We return a Slack message response to the question contained in the Slack request, sent to the channel from which the request was received.1# Web Server Integration Example.
2import os
3from dotenv import load_dotenv
4from humatron.worker.rest.flask.flask_server import start_flask_server
5from demo import HumatronWorkerChatGpt
6
7# Start the REST server.
8def start() -> None:
9 # Load the environment variables.
10 load_dotenv()
11
12 # Get the tokens from the environment.
13 req_token, resp_token = os.environ['HUMATRON_REQUEST_TOKEN'], os.environ['HUMATRON_RESPONSE_TOKEN']
14 openai_api_key = os.environ['OPENAI_API_KEY']
15 host, port, url = os.environ['REST_HOST'], int(os.environ['REST_PORT']), os.environ['REST_URL_WORKER']
16
17 worker = HumatronWorkerChatGpt(openai_api_key=openai_api_key)
18 start_flask_server(worker, req_token, resp_token, host, port, url, None, None, lambda: worker.close())
19
20if __name__ == '__main__':
21 start()
4
- import classes from the Python SDK library.5
, 18
- import and create a new instance of HumatronWorkerChatGpt
we developed above.class | description |
---|---|
RestChannelClient | The class mirrors the REST Channel API, providing a single method post to implement. This method accepts a request object as input and returns an optional response object. Note that due to the asynchronous nature of the REST channel, this post method may return a response to any previous request, the current request, or nothing at all if there is no data to return. Note that you will need to manage the request state and handle all asynchronous logic yourself. This approach is flexible but requires additional effort. |
RestChannelAsyncClient | The class is designed to operate in asynchronous mode, where you send requests and listen for responses. This class is initialized with a callback parameter, which handles the asynchronous response parts. Requests are sent asynchronously using the post method. |
RestChannelSyncClient | This is the synchronous version of the REST channel client. It features a single post method for sending requests and returning responses. Its API is similar to the RestChannelClient described above, with one significant difference: while RestChannelClient post method returns an optional currently accessible response object, RestChannelSyncClient post method waits for a response to its request and returns its result. |
1# REST Channel Base Client Example.
2from humatron.channels.rest.client import *
3
4client = RestChannelClient('https://humatron.ai/restchannel', 'your_token')
5
6while True:
7 req_txt = input('Request: ')
8 req_payload = RequestMessagePayloadPart.make(req_txt, 'sender', 'receiver')
9 req = RestChannelRequest.make_message(req_payload)
10 resp = client.post(req)
11
12 # A response to the provided payload can be received with either the initial or subsequent response.
13 while not resp or resp.payload[0].ref_payload_id != req_payload.payload_id:
14 time.sleep(1)
15 resp = client.post(RestChannelRequest.make_heartbeat())
16
17 resp_txt = resp.payload[0].text
18 print(f'Response: {resp_txt}')
1# REST Channel Asynchronous Client Example.
2chat = dict()
3
4def callback(resp_part: ResponseMessagePayloadPart) -> None:
5 r = chat.get(resp_part.ref_payload_id, 'Unknown request')
6 print(f'>>Response `{resp_part.text}` received in request `{r}`.')
7
8with RestChannelAsyncClient('https://humatron.ai/restchannel', 'your_token', callback) as client:
9 while True:
10 req_txt = input('Request: ')
11 payload = RequestMessagePayloadPart.make(req_txt, 'sender', 'receiver')
12 chat[payload.payload_id] = payload.text
13 req = RestChannelRequest.make_message(payload)
14 # The response to the provided payload will be received asynchronously.
15 client.post(req)
1# REST Channel Synchronous Client Example.
2from humatron.channels.rest.client import *
3
4client = RestChannelSyncClient('https://humatron.ai/restchannel', 'your_token')
5
6while True:
7 req_txt = input('Request: ')
8 req = RestChannelRequest.make_message(RequestMessagePayloadPart.make(req_txt, 'sender', 'receiver'))
9 # The response to the provided payload is received in a synchronous manner.
10 resp = client.post(req)
11 resp_txt = resp.payload[0].text
12 print(f'Response: {resp_txt}')