[go: up one dir, main page]

0% found this document useful (0 votes)
19 views2 pages

Matrix Message Sender API

a simple python program

Uploaded by

Bhavan Sai
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
19 views2 pages

Matrix Message Sender API

a simple python program

Uploaded by

Bhavan Sai
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 2

import asyncio

import json
from flask import Flask, request, jsonify
from flask_cors import CORS
from nio import AsyncClient, LoginResponse

app = Flask(__name__)
CORS(app)

# Matrix credentials and room ID


username = "@bhavan_test_account:matrix.org"
password = "test_password_01"
room_id = "!AGeUOyHpLMMrLYAkXW:matrix.org"

# Default values
DEFAULT_HOMESERVER = "https://matrix.org"
DEFAULT_DEVICE_NAME = "matrix-nio"

client = None
logged_in = False

async def write_details_to_disk(resp: LoginResponse, homeserver) -> None:


"""Writes the required login details to disk for later use."""
with open("credentials.json", "w") as f:
json.dump({
"homeserver": homeserver,
"user_id": resp.user_id,
"device_id": resp.device_id,
"access_token": resp.access_token,
}, f)

async def login():


global client, logged_in
try:
homeserver = DEFAULT_HOMESERVER
device_name = DEFAULT_DEVICE_NAME

client = AsyncClient(homeserver, username)


pw = password
response = await client.login(password=pw, device_name=device_name)

if isinstance(response, LoginResponse):
print(f"Logged in successfully as {response.user_id}")
await write_details_to_disk(response, homeserver)
logged_in = True
else:
print(f"Failed to log in: {response}")
logged_in = False
except Exception as e:
print(f"Login exception: {e}")
logged_in = False

async def send_matrix_message(message):


global client
try:
if not logged_in:
await login()
if not logged_in:
raise Exception("Login failed")
await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": message},
)
print(f"Message '{message}' sent successfully!")
except Exception as e:
print(f"Failed to send message: {e}")

@app.route('/send', methods=['POST', 'OPTIONS'])


def send_message():
global logged_in
if request.method == 'OPTIONS':
return jsonify({'status': 'Options request successful'}), 200

try:
data = request.json
message = data['message']

asyncio.run(send_matrix_message(message))

return jsonify({'status': 'Message sent'}), 200


except Exception as e:
return jsonify({'status': f'Failed to send message: {str(e)}'}), 500

if __name__ == "__main__":
app.run(debug=True)

You might also like