customwebhookbot.py

  1#!/usr/bin/env python
  2# This program is dedicated to the public domain under the CC0 license.
  3# pylint: disable=import-error,wrong-import-position
  4"""
  5Simple example of a bot that uses a custom webhook setup and handles custom updates.
  6For the custom webhook setup, the libraries `starlette` and `uvicorn` are used. Please install
  7them as `pip install starlette~=0.20.0 uvicorn~=0.17.0`.
  8Note that any other `asyncio` based web server framework can be used for a custom webhook setup
  9just as well.
 10
 11Usage:
 12Set bot token, url, admin chat_id and port at the start of the `main` function.
 13You may also need to change the `listen` value in the uvicorn configuration to match your setup.
 14Press Ctrl-C on the command line or send a signal to the process to stop the bot.
 15"""
 16import asyncio
 17import html
 18import logging
 19from dataclasses import dataclass
 20from http import HTTPStatus
 21
 22import uvicorn
 23from starlette.applications import Starlette
 24from starlette.requests import Request
 25from starlette.responses import PlainTextResponse, Response
 26from starlette.routing import Route
 27
 28from telegram import __version__ as TG_VER
 29
 30try:
 31    from telegram import __version_info__
 32except ImportError:
 33    __version_info__ = (0, 0, 0, 0, 0)  # type: ignore[assignment]
 34
 35if __version_info__ < (20, 0, 0, "alpha", 1):
 36    raise RuntimeError(
 37        f"This example is not compatible with your current PTB version {TG_VER}. To view the "
 38        f"{TG_VER} version of this example, "
 39        f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
 40    )
 41
 42from telegram import Update
 43from telegram.constants import ParseMode
 44from telegram.ext import (
 45    Application,
 46    CallbackContext,
 47    CommandHandler,
 48    ContextTypes,
 49    ExtBot,
 50    TypeHandler,
 51)
 52
 53# Enable logging
 54logging.basicConfig(
 55    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
 56)
 57# set higher logging level for httpx to avoid all GET and POST requests being logged
 58logging.getLogger("httpx").setLevel(logging.WARNING)
 59
 60logger = logging.getLogger(__name__)
 61
 62
 63@dataclass
 64class WebhookUpdate:
 65    """Simple dataclass to wrap a custom update type"""
 66
 67    user_id: int
 68    payload: str
 69
 70
 71class CustomContext(CallbackContext[ExtBot, dict, dict, dict]):
 72    """
 73    Custom CallbackContext class that makes `user_data` available for updates of type
 74    `WebhookUpdate`.
 75    """
 76
 77    @classmethod
 78    def from_update(
 79        cls,
 80        update: object,
 81        application: "Application",
 82    ) -> "CustomContext":
 83        if isinstance(update, WebhookUpdate):
 84            return cls(application=application, user_id=update.user_id)
 85        return super().from_update(update, application)
 86
 87
 88async def start(update: Update, context: CustomContext) -> None:
 89    """Display a message with instructions on how to use this bot."""
 90    url = context.bot_data["url"]
 91    payload_url = html.escape(f"{url}/submitpayload?user_id=<your user id>&payload=<payload>")
 92    text = (
 93        f"To check if the bot is still running, call <code>{url}/healthcheck</code>.\n\n"
 94        f"To post a custom update, call <code>{payload_url}</code>."
 95    )
 96    await update.message.reply_html(text=text)
 97
 98
 99async def webhook_update(update: WebhookUpdate, context: CustomContext) -> None:
100    """Callback that handles the custom updates."""
101    chat_member = await context.bot.get_chat_member(chat_id=update.user_id, user_id=update.user_id)
102    payloads = context.user_data.setdefault("payloads", [])
103    payloads.append(update.payload)
104    combined_payloads = "</code>\n• <code>".join(payloads)
105    text = (
106        f"The user {chat_member.user.mention_html()} has sent a new payload. "
107        f"So far they have sent the following payloads: \n\n• <code>{combined_payloads}</code>"
108    )
109    await context.bot.send_message(
110        chat_id=context.bot_data["admin_chat_id"], text=text, parse_mode=ParseMode.HTML
111    )
112
113
114async def main() -> None:
115    """Set up the application and a custom webserver."""
116    url = "https://domain.tld"
117    admin_chat_id = 123456
118    port = 8000
119
120    context_types = ContextTypes(context=CustomContext)
121    # Here we set updater to None because we want our custom webhook server to handle the updates
122    # and hence we don't need an Updater instance
123    application = (
124        Application.builder().token("TOKEN").updater(None).context_types(context_types).build()
125    )
126    # save the values in `bot_data` such that we may easily access them in the callbacks
127    application.bot_data["url"] = url
128    application.bot_data["admin_chat_id"] = admin_chat_id
129
130    # register handlers
131    application.add_handler(CommandHandler("start", start))
132    application.add_handler(TypeHandler(type=WebhookUpdate, callback=webhook_update))
133
134    # Pass webhook settings to telegram
135    await application.bot.set_webhook(url=f"{url}/telegram", allowed_updates=Update.ALL_TYPES)
136
137    # Set up webserver
138    async def telegram(request: Request) -> Response:
139        """Handle incoming Telegram updates by putting them into the `update_queue`"""
140        await application.update_queue.put(
141            Update.de_json(data=await request.json(), bot=application.bot)
142        )
143        return Response()
144
145    async def custom_updates(request: Request) -> PlainTextResponse:
146        """
147        Handle incoming webhook updates by also putting them into the `update_queue` if
148        the required parameters were passed correctly.
149        """
150        try:
151            user_id = int(request.query_params["user_id"])
152            payload = request.query_params["payload"]
153        except KeyError:
154            return PlainTextResponse(
155                status_code=HTTPStatus.BAD_REQUEST,
156                content="Please pass both `user_id` and `payload` as query parameters.",
157            )
158        except ValueError:
159            return PlainTextResponse(
160                status_code=HTTPStatus.BAD_REQUEST,
161                content="The `user_id` must be a string!",
162            )
163
164        await application.update_queue.put(WebhookUpdate(user_id=user_id, payload=payload))
165        return PlainTextResponse("Thank you for the submission! It's being forwarded.")
166
167    async def health(_: Request) -> PlainTextResponse:
168        """For the health endpoint, reply with a simple plain text message."""
169        return PlainTextResponse(content="The bot is still running fine :)")
170
171    starlette_app = Starlette(
172        routes=[
173            Route("/telegram", telegram, methods=["POST"]),
174            Route("/healthcheck", health, methods=["GET"]),
175            Route("/submitpayload", custom_updates, methods=["POST", "GET"]),
176        ]
177    )
178    webserver = uvicorn.Server(
179        config=uvicorn.Config(
180            app=starlette_app,
181            port=port,
182            use_colors=False,
183            host="127.0.0.1",
184        )
185    )
186
187    # Run application and webserver together
188    async with application:
189        await application.start()
190        await webserver.serve()
191        await application.stop()
192
193
194if __name__ == "__main__":
195    asyncio.run(main())