[docs]classChatbotService:"""Service class for handling chatbot interactions. Attributes: conv_repo (ConversationRepository): Repository for conversation data access. msg_repo (MessageRepository): Repository for message data access. assessment_repo (AssessmentRepository): Repository for assessment data access. user_agents (dict): A dictionary to store and manage a unique Manager Agent instance for each active user session, mapping user_id to agent instance. """CHATBOT_MAX_RETRIEVE_MSG=10CHATBOT_START_CONV_MSG="Hello! I'm Kusibot and I'm here to chat with you about how you're feeling today."CHATBOT_NO_MSG_PROVIDED='Sorry, but you didn\'t provide any message.'CHATBOT_NO_CONVERSATION='No current conversation found. Please start a new conversation.'def__init__(self):self.conv_repo=ConversationRepository()self.msg_repo=MessageRepository()self.assessment_repo=AssessmentRepository()self.user_agents={}
[docs]defcreate_or_get_conversation(self,user_id):""" Creates a new conversation for the authenticated user or gets the current conversation. Args: user_id: ID of the current user to retrieve or create the conversation from. Returns: messages: Array of JSON messages for the new/current conversation. """# Get the current conversation for the usercurrent_conv=self.conv_repo.get_current_conversation_by_user_id(user_id)# If there is no current conversation...ifnotcurrent_conv:# Create a new onecurrent_conv=self.conv_repo.create_conversation(user_id)# Save the initial message from the chatbotifcurrent_conv:self.msg_repo.save_chatbot_message(current_conv.id,self.CHATBOT_START_CONV_MSG,intent="Greeting")# Create a new user agent for the user if it doesn't existself._get_or_create_chatbot_manager(user_id)# Either a new conversation was created or the current one was found, return the messagesmessages=self.msg_repo.get_limited_messages(conv_id=current_conv.id,limit=self.CHATBOT_MAX_RETRIEVE_MSG)messages.reverse()return[{'text':msg.text,'is_user':msg.is_user,'timestamp':msg.timestamp.isoformat()+'Z'}formsginmessages]
[docs]defget_response(self,user_input,user_id):""" Generates a response from the chatbot. Args: user_input: The message sent by the user. user_id: ID of the user making the request. Returns: response: The chatbot's response to the user's input. """# Get the current conversation for the usercurrent_conv=self.conv_repo.get_current_conversation_by_user_id(user_id)# If there is no current conversation, create a new oneifnotcurrent_conv:returnself.CHATBOT_NO_CONVERSATION# Get the correct chatbot manager for the user.user_agent=self._get_or_create_chatbot_manager(user_id)# Response generation based on whether an assessment is active or notbot_response=user_agent.generate_bot_response(user_input,user_id,current_conv.id)# Saving user message firstself.msg_repo.save_user_message(conv_id=current_conv.id,msg=user_input,intent=bot_response["intent_detected"])# Storing bot response afterself.msg_repo.save_chatbot_message(conv_id=current_conv.id,msg=bot_response["agent_response"],intent=None,agent_type=bot_response["agent_type"])return{"agent_response":bot_response["agent_response"],"agent_type":bot_response["agent_type"],"intent_detected":bot_response["intent_detected"]}
[docs]def_get_or_create_chatbot_manager(self,user_id):""" Gets or creates a ChatbotManagerAgent for the user to handle chatbot interactions keeping user-specific state and conversation management. Args: user_id: ID of the user to get or create the chatbot manager for. Returns: ChatbotManagerAgent: The chatbot manager for the user. """ifuser_idnotinself.user_agents:self.user_agents[user_id]=ChatbotManagerAgent()returnself.user_agents[user_id]
[docs]def_clear_user_agent(self,user_id):""" Clears the chatbot manager for the user, effectively resetting their state. Args: user_id: ID of the user to clear the chatbot manager for. """ifuser_idinself.user_agents:delself.user_agents[user_id]
[docs]defend_conversation(self,user_id):""" Ends the current conversation and clear the user agent for the authenticated user. Args: user_id: ID of the current user to end the conversation for. Returns: bool: True if the conversation was successfully ended, False otherwise. """# Clear the user agent to reset the stateself._clear_user_agent(user_id)# Get the current conversation for the usercurrent_conv=self.conv_repo.get_current_conversation_by_user_id(user_id)# If there is a current conversation, end itifcurrent_conv:self.conv_repo.end_conversation(current_conv.id)# If there was an active assessment, end it as wellifself.assessment_repo.is_assessment_active(user_id):self.assessment_repo.end_assessment(user_id)returnTruereturnFalse
chatbot_service=ChatbotService()# Singleton instance of ChatbotService