nestedconversationbot.py

  1#!/usr/bin/env python
  2# pylint: disable=unused-argument, wrong-import-position
  3# This program is dedicated to the public domain under the CC0 license.
  4
  5"""
  6First, a few callback functions are defined. Then, those functions are passed to
  7the Application and registered at their respective places.
  8Then, the bot is started and runs until we press Ctrl-C on the command line.
  9
 10Usage:
 11Example of a bot-user conversation using nested ConversationHandlers.
 12Send /start to initiate the conversation.
 13Press Ctrl-C on the command line or send a signal to the process to stop the
 14bot.
 15"""
 16
 17import logging
 18from typing import Any, Dict, Tuple
 19
 20from telegram import __version__ as TG_VER
 21
 22try:
 23    from telegram import __version_info__
 24except ImportError:
 25    __version_info__ = (0, 0, 0, 0, 0)  # type: ignore[assignment]
 26
 27if __version_info__ < (20, 0, 0, "alpha", 1):
 28    raise RuntimeError(
 29        f"This example is not compatible with your current PTB version {TG_VER}. To view the "
 30        f"{TG_VER} version of this example, "
 31        f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
 32    )
 33from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
 34from telegram.ext import (
 35    Application,
 36    CallbackQueryHandler,
 37    CommandHandler,
 38    ContextTypes,
 39    ConversationHandler,
 40    MessageHandler,
 41    filters,
 42)
 43
 44# Enable logging
 45logging.basicConfig(
 46    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
 47)
 48# set higher logging level for httpx to avoid all GET and POST requests being logged
 49logging.getLogger("httpx").setLevel(logging.WARNING)
 50
 51logger = logging.getLogger(__name__)
 52
 53# State definitions for top level conversation
 54SELECTING_ACTION, ADDING_MEMBER, ADDING_SELF, DESCRIBING_SELF = map(chr, range(4))
 55# State definitions for second level conversation
 56SELECTING_LEVEL, SELECTING_GENDER = map(chr, range(4, 6))
 57# State definitions for descriptions conversation
 58SELECTING_FEATURE, TYPING = map(chr, range(6, 8))
 59# Meta states
 60STOPPING, SHOWING = map(chr, range(8, 10))
 61# Shortcut for ConversationHandler.END
 62END = ConversationHandler.END
 63
 64# Different constants for this example
 65(
 66    PARENTS,
 67    CHILDREN,
 68    SELF,
 69    GENDER,
 70    MALE,
 71    FEMALE,
 72    AGE,
 73    NAME,
 74    START_OVER,
 75    FEATURES,
 76    CURRENT_FEATURE,
 77    CURRENT_LEVEL,
 78) = map(chr, range(10, 22))
 79
 80
 81# Helper
 82def _name_switcher(level: str) -> Tuple[str, str]:
 83    if level == PARENTS:
 84        return "Father", "Mother"
 85    return "Brother", "Sister"
 86
 87
 88# Top level conversation callbacks
 89async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
 90    """Select an action: Adding parent/child or show data."""
 91    text = (
 92        "You may choose to add a family member, yourself, show the gathered data, or end the "
 93        "conversation. To abort, simply type /stop."
 94    )
 95
 96    buttons = [
 97        [
 98            InlineKeyboardButton(text="Add family member", callback_data=str(ADDING_MEMBER)),
 99            InlineKeyboardButton(text="Add yourself", callback_data=str(ADDING_SELF)),
100        ],
101        [
102            InlineKeyboardButton(text="Show data", callback_data=str(SHOWING)),
103            InlineKeyboardButton(text="Done", callback_data=str(END)),
104        ],
105    ]
106    keyboard = InlineKeyboardMarkup(buttons)
107
108    # If we're starting over we don't need to send a new message
109    if context.user_data.get(START_OVER):
110        await update.callback_query.answer()
111        await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
112    else:
113        await update.message.reply_text(
114            "Hi, I'm Family Bot and I'm here to help you gather information about your family."
115        )
116        await update.message.reply_text(text=text, reply_markup=keyboard)
117
118    context.user_data[START_OVER] = False
119    return SELECTING_ACTION
120
121
122async def adding_self(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
123    """Add information about yourself."""
124    context.user_data[CURRENT_LEVEL] = SELF
125    text = "Okay, please tell me about yourself."
126    button = InlineKeyboardButton(text="Add info", callback_data=str(MALE))
127    keyboard = InlineKeyboardMarkup.from_button(button)
128
129    await update.callback_query.answer()
130    await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
131
132    return DESCRIBING_SELF
133
134
135async def show_data(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
136    """Pretty print gathered data."""
137
138    def pretty_print(data: Dict[str, Any], level: str) -> str:
139        people = data.get(level)
140        if not people:
141            return "\nNo information yet."
142
143        return_str = ""
144        if level == SELF:
145            for person in data[level]:
146                return_str += f"\nName: {person.get(NAME, '-')}, Age: {person.get(AGE, '-')}"
147        else:
148            male, female = _name_switcher(level)
149
150            for person in data[level]:
151                gender = female if person[GENDER] == FEMALE else male
152                return_str += (
153                    f"\n{gender}: Name: {person.get(NAME, '-')}, Age: {person.get(AGE, '-')}"
154                )
155        return return_str
156
157    user_data = context.user_data
158    text = f"Yourself:{pretty_print(user_data, SELF)}"
159    text += f"\n\nParents:{pretty_print(user_data, PARENTS)}"
160    text += f"\n\nChildren:{pretty_print(user_data, CHILDREN)}"
161
162    buttons = [[InlineKeyboardButton(text="Back", callback_data=str(END))]]
163    keyboard = InlineKeyboardMarkup(buttons)
164
165    await update.callback_query.answer()
166    await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
167    user_data[START_OVER] = True
168
169    return SHOWING
170
171
172async def stop(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
173    """End Conversation by command."""
174    await update.message.reply_text("Okay, bye.")
175
176    return END
177
178
179async def end(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
180    """End conversation from InlineKeyboardButton."""
181    await update.callback_query.answer()
182
183    text = "See you around!"
184    await update.callback_query.edit_message_text(text=text)
185
186    return END
187
188
189# Second level conversation callbacks
190async def select_level(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
191    """Choose to add a parent or a child."""
192    text = "You may add a parent or a child. Also you can show the gathered data or go back."
193    buttons = [
194        [
195            InlineKeyboardButton(text="Add parent", callback_data=str(PARENTS)),
196            InlineKeyboardButton(text="Add child", callback_data=str(CHILDREN)),
197        ],
198        [
199            InlineKeyboardButton(text="Show data", callback_data=str(SHOWING)),
200            InlineKeyboardButton(text="Back", callback_data=str(END)),
201        ],
202    ]
203    keyboard = InlineKeyboardMarkup(buttons)
204
205    await update.callback_query.answer()
206    await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
207
208    return SELECTING_LEVEL
209
210
211async def select_gender(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
212    """Choose to add mother or father."""
213    level = update.callback_query.data
214    context.user_data[CURRENT_LEVEL] = level
215
216    text = "Please choose, whom to add."
217
218    male, female = _name_switcher(level)
219
220    buttons = [
221        [
222            InlineKeyboardButton(text=f"Add {male}", callback_data=str(MALE)),
223            InlineKeyboardButton(text=f"Add {female}", callback_data=str(FEMALE)),
224        ],
225        [
226            InlineKeyboardButton(text="Show data", callback_data=str(SHOWING)),
227            InlineKeyboardButton(text="Back", callback_data=str(END)),
228        ],
229    ]
230    keyboard = InlineKeyboardMarkup(buttons)
231
232    await update.callback_query.answer()
233    await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
234
235    return SELECTING_GENDER
236
237
238async def end_second_level(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
239    """Return to top level conversation."""
240    context.user_data[START_OVER] = True
241    await start(update, context)
242
243    return END
244
245
246# Third level callbacks
247async def select_feature(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
248    """Select a feature to update for the person."""
249    buttons = [
250        [
251            InlineKeyboardButton(text="Name", callback_data=str(NAME)),
252            InlineKeyboardButton(text="Age", callback_data=str(AGE)),
253            InlineKeyboardButton(text="Done", callback_data=str(END)),
254        ]
255    ]
256    keyboard = InlineKeyboardMarkup(buttons)
257
258    # If we collect features for a new person, clear the cache and save the gender
259    if not context.user_data.get(START_OVER):
260        context.user_data[FEATURES] = {GENDER: update.callback_query.data}
261        text = "Please select a feature to update."
262
263        await update.callback_query.answer()
264        await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
265    # But after we do that, we need to send a new message
266    else:
267        text = "Got it! Please select a feature to update."
268        await update.message.reply_text(text=text, reply_markup=keyboard)
269
270    context.user_data[START_OVER] = False
271    return SELECTING_FEATURE
272
273
274async def ask_for_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
275    """Prompt user to input data for selected feature."""
276    context.user_data[CURRENT_FEATURE] = update.callback_query.data
277    text = "Okay, tell me."
278
279    await update.callback_query.answer()
280    await update.callback_query.edit_message_text(text=text)
281
282    return TYPING
283
284
285async def save_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
286    """Save input for feature and return to feature selection."""
287    user_data = context.user_data
288    user_data[FEATURES][user_data[CURRENT_FEATURE]] = update.message.text
289
290    user_data[START_OVER] = True
291
292    return await select_feature(update, context)
293
294
295async def end_describing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
296    """End gathering of features and return to parent conversation."""
297    user_data = context.user_data
298    level = user_data[CURRENT_LEVEL]
299    if not user_data.get(level):
300        user_data[level] = []
301    user_data[level].append(user_data[FEATURES])
302
303    # Print upper level menu
304    if level == SELF:
305        user_data[START_OVER] = True
306        await start(update, context)
307    else:
308        await select_level(update, context)
309
310    return END
311
312
313async def stop_nested(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
314    """Completely end conversation from within nested conversation."""
315    await update.message.reply_text("Okay, bye.")
316
317    return STOPPING
318
319
320def main() -> None:
321    """Run the bot."""
322    # Create the Application and pass it your bot's token.
323    application = Application.builder().token("TOKEN").build()
324
325    # Set up third level ConversationHandler (collecting features)
326    description_conv = ConversationHandler(
327        entry_points=[
328            CallbackQueryHandler(
329                select_feature, pattern="^" + str(MALE) + "$|^" + str(FEMALE) + "$"
330            )
331        ],
332        states={
333            SELECTING_FEATURE: [
334                CallbackQueryHandler(ask_for_input, pattern="^(?!" + str(END) + ").*$")
335            ],
336            TYPING: [MessageHandler(filters.TEXT & ~filters.COMMAND, save_input)],
337        },
338        fallbacks=[
339            CallbackQueryHandler(end_describing, pattern="^" + str(END) + "$"),
340            CommandHandler("stop", stop_nested),
341        ],
342        map_to_parent={
343            # Return to second level menu
344            END: SELECTING_LEVEL,
345            # End conversation altogether
346            STOPPING: STOPPING,
347        },
348    )
349
350    # Set up second level ConversationHandler (adding a person)
351    add_member_conv = ConversationHandler(
352        entry_points=[CallbackQueryHandler(select_level, pattern="^" + str(ADDING_MEMBER) + "$")],
353        states={
354            SELECTING_LEVEL: [
355                CallbackQueryHandler(select_gender, pattern=f"^{PARENTS}$|^{CHILDREN}$")
356            ],
357            SELECTING_GENDER: [description_conv],
358        },
359        fallbacks=[
360            CallbackQueryHandler(show_data, pattern="^" + str(SHOWING) + "$"),
361            CallbackQueryHandler(end_second_level, pattern="^" + str(END) + "$"),
362            CommandHandler("stop", stop_nested),
363        ],
364        map_to_parent={
365            # After showing data return to top level menu
366            SHOWING: SHOWING,
367            # Return to top level menu
368            END: SELECTING_ACTION,
369            # End conversation altogether
370            STOPPING: END,
371        },
372    )
373
374    # Set up top level ConversationHandler (selecting action)
375    # Because the states of the third level conversation map to the ones of the second level
376    # conversation, we need to make sure the top level conversation can also handle them
377    selection_handlers = [
378        add_member_conv,
379        CallbackQueryHandler(show_data, pattern="^" + str(SHOWING) + "$"),
380        CallbackQueryHandler(adding_self, pattern="^" + str(ADDING_SELF) + "$"),
381        CallbackQueryHandler(end, pattern="^" + str(END) + "$"),
382    ]
383    conv_handler = ConversationHandler(
384        entry_points=[CommandHandler("start", start)],
385        states={
386            SHOWING: [CallbackQueryHandler(start, pattern="^" + str(END) + "$")],
387            SELECTING_ACTION: selection_handlers,
388            SELECTING_LEVEL: selection_handlers,
389            DESCRIBING_SELF: [description_conv],
390            STOPPING: [CommandHandler("start", start)],
391        },
392        fallbacks=[CommandHandler("stop", stop)],
393    )
394
395    application.add_handler(conv_handler)
396
397    # Run the bot until the user presses Ctrl-C
398    application.run_polling(allowed_updates=Update.ALL_TYPES)
399
400
401if __name__ == "__main__":
402    main()

State Diagram

flowchart TB %% Documentation: https://mermaid-js.github.io/mermaid/#/flowchart A(("/start")):::entryPoint -->|Hi! I'm FamilyBot...| B((SELECTING_ACTION)):::state B --> C("Show Data"):::userInput C --> |"(List of gathered data)"| D((SHOWING)):::state D --> E("Back"):::userInput E --> B B --> F("Add Yourself"):::userInput F --> G(("DESCRIBING_SELF")):::state G --> H("Add info"):::userInput H --> I((SELECT_FEATURE)):::state I --> |"Please select a feature to update. <br /> - Name <br /> - Age <br /> - Done"|J("(choice)"):::userInput J --> |"Okay, tell me."| K((TYPING)):::state K --> L("(text)"):::userInput L --> |"[saving]"|I I --> M("Done"):::userInput M --> B B --> N("Add family member"):::userInput R --> I W --> |"See you around!"|End(("END")):::termination Y(("ANY STATE")):::state --> Z("/stop"):::userInput Z -->|"Okay, bye."| End B --> W("Done"):::userInput subgraph nestedConversation[Nested Conversation: Add Family Member] direction BT N --> O(("SELECT_LEVEL")):::state O --> |"Add... <br /> - Add Parent <br /> - Add Child <br />"|P("(choice)"):::userInput P --> Q(("SELECT_GENDER")):::state Q --> |"- Mother <br /> - Father <br /> / <br /> - Sister <br /> - Brother"| R("(choice)"):::userInput Q --> V("Show Data"):::userInput Q --> T(("SELECTING_ACTION")):::state Q --> U("Back"):::userInput U --> T O --> U O --> V V --> S(("SHOWING")):::state V --> T end classDef userInput fill:#2a5279, color:#ffffff, stroke:#ffffff classDef state fill:#222222, color:#ffffff, stroke:#ffffff classDef entryPoint fill:#009c11, stroke:#42FF57, color:#ffffff classDef termination fill:#bb0007, stroke:#E60109, color:#ffffff style nestedConversation fill:#999999, stroke-width:2px, stroke:#333333