Python连接多个服务组件

同步

PyMongo

import pymongo

client = pymongo.MongoClient("mongodb://localhost:27017/")
client = pymongo.MongoClient("mongodb://user:password@localhost:27017/")

db = client.test_database

Redis

import redis

redis_pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
redis_client = redis.Redis(connection_pool=redis_pool, decode_responses=True)

异步

AioMysql

# -*- coding: utf-8 -*-

# Author: 桑葚ICE
# Email: 152516cc@gmail.com
# Blog: iicey.github.io
# JueJin: juejin.im/user/5c64dce8e51d45013c40742c
import asyncio

import aiomysql


async def main():
    loop = asyncio.get_event_loop()
    conn = await aiomysql.connect(host='127.0.0.1',
                                  port=3306,
                                  user='root',
                                  password='',
                                  db='aio_test',
                                  loop=loop)

    sql = "INSERT INTO table_name(`field1`, `field2`) VALUES (%s, %s);"
    values = ("value1", "value2")

    async with conn.cursor() as cur:
        await cur.execute(query=sql, args=values)
        await conn.commit()

    conn.close()

async def test_example(loop):
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                      user='root', password='',
                                      db='mysql', loop=loop)
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            # 批量操作
            # values = []
            #     values.append(tuple(value), )
            # await cur.executemany(query=sql, args=values)
            print(cur.description)
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()


loop = asyncio.get_event_loop()
loop.run_until_complete(test_example(loop))

if __name__ == '__main__':
    asyncio.run(main())

AioKafka

from aiokafka import AIOKafkaProducer

# 生产
async def kafka_producer():
    producer = AIOKafkaProducer(bootstrap_servers=bootstrap_servers)
    await producer.start()
    # 不等待
    await producer.send(topic, json.dumps(data, ensure_ascii=False).encode("utf-8"))
    # 等待
    await producer.send_and_wait(topic, json.dumps(data, ensure_ascii=False).encode("utf-8"))

# 消费
async def kafka_consumer():
    consumer = AIOKafkaConsumer(topic=topic, group_id=group_id, auto_offset_reset="earliest",bootstrap_servers=bootstrap_servers)
    await consumer.start()
    try:
        async for msg in consumer:
            print(msg.value.decode())
    finally:
        await consumer.stop()

AioRedis

import aioredis

# 1.0版本
async def redis_connect():
    connection = await aioredis.create_pool(
        redis_address, password=redis_password,
        db=redis_db, encoding='utf-8'
    )
    # connection = await aioredis.create_pool('redis://{user}:{password}@{host}:{port}', encoding='utf-8')
    redis = aioredis.Redis(pool_or_conn=connection)
    return redis

# 2.0版本
async def redis_connect():
    pool = aioredis.ConnectionPool.from_url(
        redis_address, password=redis_password,
        db=redis_db, encoding='utf-8', decode_responses=True
    )
    redis = aioredis.Redis(connection_pool=pool)
    return redis

Telegram

from telethon import TelegramClient

async def tg_client():
    client = TelegramClient(
        session=session_name,
        api_id=api_id, api_hash=api_hash,
        proxy=api_proxy
    )
    return client