Python连接多个服务组件
2022-06-07
3 min read
# redis
# pymongo
# aiokafka
# kafka
# telegram
# 组件
# asyncio
# aiomysql
# aioredis
同步
PyMongo
import pymongo
client = pymongo.MongoClient("mongodb://localhost:27017/")
client = pymongo.MongoClient("mongodb://user:password@localhost:27017/")
db = client.test_database
Redis
import redis
redis_pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
redis_client = redis.Redis(connection_pool=redis_pool, decode_responses=True)
异步
AioMysql
# -*- coding: utf-8 -*-
# Author: 桑葚ICE
# Email: 152516cc@gmail.com
# Blog: iicey.github.io
# JueJin: juejin.im/user/5c64dce8e51d45013c40742c
import asyncio
import aiomysql
async def main():
loop = asyncio.get_event_loop()
conn = await aiomysql.connect(host='127.0.0.1',
port=3306,
user='root',
password='',
db='aio_test',
loop=loop)
sql = "INSERT INTO table_name(`field1`, `field2`) VALUES (%s, %s);"
values = ("value1", "value2")
async with conn.cursor() as cur:
await cur.execute(query=sql, args=values)
await conn.commit()
conn.close()
async def test_example(loop):
pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='',
db='mysql', loop=loop)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
# 批量操作
# values = []
# values.append(tuple(value), )
# await cur.executemany(query=sql, args=values)
print(cur.description)
(r,) = await cur.fetchone()
assert r == 42
pool.close()
await pool.wait_closed()
loop = asyncio.get_event_loop()
loop.run_until_complete(test_example(loop))
if __name__ == '__main__':
asyncio.run(main())
AioKafka
from aiokafka import AIOKafkaProducer
# 生产
async def kafka_producer():
producer = AIOKafkaProducer(bootstrap_servers=bootstrap_servers)
await producer.start()
# 不等待
await producer.send(topic, json.dumps(data, ensure_ascii=False).encode("utf-8"))
# 等待
await producer.send_and_wait(topic, json.dumps(data, ensure_ascii=False).encode("utf-8"))
# 消费
async def kafka_consumer():
consumer = AIOKafkaConsumer(topic=topic, group_id=group_id, auto_offset_reset="earliest",bootstrap_servers=bootstrap_servers)
await consumer.start()
try:
async for msg in consumer:
print(msg.value.decode())
finally:
await consumer.stop()
AioRedis
import aioredis
# 1.0版本
async def redis_connect():
connection = await aioredis.create_pool(
redis_address, password=redis_password,
db=redis_db, encoding='utf-8'
)
# connection = await aioredis.create_pool('redis://{user}:{password}@{host}:{port}', encoding='utf-8')
redis = aioredis.Redis(pool_or_conn=connection)
return redis
# 2.0版本
async def redis_connect():
pool = aioredis.ConnectionPool.from_url(
redis_address, password=redis_password,
db=redis_db, encoding='utf-8', decode_responses=True
)
redis = aioredis.Redis(connection_pool=pool)
return redis
Telegram
from telethon import TelegramClient
async def tg_client():
client = TelegramClient(
session=session_name,
api_id=api_id, api_hash=api_hash,
proxy=api_proxy
)
return client