跳转至

交易模块 API 参考

QKA 的交易接口模块,提供 QMT 交易服务器的客户端和服务器端实现。

qka.brokers.QMTClient

QMT 交易客户端类,提供与 QMT 交易服务器的通信接口。

QMT交易客户端类

提供与QMT交易服务器的通信接口,支持各种交易操作。

属性:

名称 类型 描述
base_url str

API服务器地址

session Session

HTTP会话对象

token str

访问令牌

headers Dict

HTTP请求头

Source code in qka/brokers/client.py
class QMTClient:
    """
    QMT交易客户端类

    提供与QMT交易服务器的通信接口,支持各种交易操作。

    Attributes:
        base_url (str): API服务器地址
        session (requests.Session): HTTP会话对象
        token (str): 访问令牌
        headers (Dict): HTTP请求头
    """

    def __init__(self, base_url: str = "http://localhost:8000", token: Optional[str] = None):
        """
        初始化交易客户端

        Args:
            base_url (str): API服务器地址,默认为本地8000端口
            token (str): 访问令牌,必须与服务器的token一致

        Raises:
            ValueError: 如果未提供token
        """
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
        if not token:
            raise ValueError("必须提供访问令牌(token)")
        self.token = token
        self.headers = {"X-Token": self.token}

    def api(self, method_name: str, **params) -> Any:
        """
        通用调用接口方法

        Args:
            method_name (str): 要调用的接口名称
            **params: 接口参数,作为关键字参数传入

        Returns:
            Any: 接口返回的数据

        Raises:
            Exception: API调用失败时抛出异常
        """
        try:
            response = self.session.post(
                f"{self.base_url}/api/{method_name}",
                json=params or {},
                headers=self.headers
            )
            response.raise_for_status()
            result = response.json()

            if not result.get('success'):
                raise Exception(f"API调用失败: {result.get('detail')}")

            return result.get('data')
        except Exception as e:
            logger.error(f"调用 {method_name} 失败: {str(e)}")
            raise

__init__(base_url='http://localhost:8000', token=None)

初始化交易客户端

参数:

名称 类型 描述 默认
base_url str

API服务器地址,默认为本地8000端口

'http://localhost:8000'
token str

访问令牌,必须与服务器的token一致

None

引发:

类型 描述
ValueError

如果未提供token

源代码位于: qka/brokers/client.py
def __init__(self, base_url: str = "http://localhost:8000", token: Optional[str] = None):
    """
    初始化交易客户端

    Args:
        base_url (str): API服务器地址,默认为本地8000端口
        token (str): 访问令牌,必须与服务器的token一致

    Raises:
        ValueError: 如果未提供token
    """
    self.base_url = base_url.rstrip('/')
    self.session = requests.Session()
    if not token:
        raise ValueError("必须提供访问令牌(token)")
    self.token = token
    self.headers = {"X-Token": self.token}

api(method_name, **params)

通用调用接口方法

参数:

名称 类型 描述 默认
method_name str

要调用的接口名称

必需
**params

接口参数,作为关键字参数传入

{}

返回:

名称 类型 描述
Any Any

接口返回的数据

引发:

类型 描述
Exception

API调用失败时抛出异常

源代码位于: qka/brokers/client.py
def api(self, method_name: str, **params) -> Any:
    """
    通用调用接口方法

    Args:
        method_name (str): 要调用的接口名称
        **params: 接口参数,作为关键字参数传入

    Returns:
        Any: 接口返回的数据

    Raises:
        Exception: API调用失败时抛出异常
    """
    try:
        response = self.session.post(
            f"{self.base_url}/api/{method_name}",
            json=params or {},
            headers=self.headers
        )
        response.raise_for_status()
        result = response.json()

        if not result.get('success'):
            raise Exception(f"API调用失败: {result.get('detail')}")

        return result.get('data')
    except Exception as e:
        logger.error(f"调用 {method_name} 失败: {str(e)}")
        raise

使用示例

from qka.brokers.client import QMTClient

# 创建交易客户端
client = QMTClient(
    base_url="http://localhost:8000",
    token="服务器打印的token"
)

# 调用交易接口
assets = client.api("query_stock_asset")
print(assets)

qka.brokers.QMTServer

QMT 交易服务器类,将 QMT 交易接口封装为 RESTful API。

QMT交易服务器类

将QMT交易接口封装为RESTful API,支持远程调用交易功能。

属性:

名称 类型 描述
account_id str

账户ID

mini_qmt_path str

miniQMT安装路径

host str

服务器地址

port int

服务器端口

app FastAPI

FastAPI应用实例

trader

QMT交易对象

account

账户对象

token str

访问令牌

Source code in qka/brokers/server.py
class QMTServer:
    """
    QMT交易服务器类

    将QMT交易接口封装为RESTful API,支持远程调用交易功能。

    Attributes:
        account_id (str): 账户ID
        mini_qmt_path (str): miniQMT安装路径
        host (str): 服务器地址
        port (int): 服务器端口
        app (FastAPI): FastAPI应用实例
        trader: QMT交易对象
        account: 账户对象
        token (str): 访问令牌
    """

    def __init__(self, account_id: str, mini_qmt_path: str, host: str = "0.0.0.0", port: int = 8000, token: Optional[str] = None):
        """
        初始化交易服务器

        Args:
            account_id (str): 账户ID
            mini_qmt_path (str): miniQMT路径
            host (str): 服务器地址,默认0.0.0.0
            port (int): 服务器端口,默认8000
            token (Optional[str]): 可选的自定义token
        """
        self.account_id = account_id
        self.mini_qmt_path = mini_qmt_path
        self.host = host
        self.port = port
        self.app = FastAPI()
        self.trader = None
        self.account = None
        self.token = token if token else self.generate_token()  # 使用自定义token或生成固定token
        print(f"\n授权Token: {self.token}\n")  # 打印token供客户端使用

    def generate_token(self) -> str:
        """
        生成基于机器码的固定token

        Returns:
            str: 生成的token字符串
        """
        # 获取机器码(例如MAC地址)
        mac = uuid.getnode()
        # 将机器码转换为字符串
        mac_str = str(mac)
        # 使用SHA256哈希生成固定长度的token
        token = hashlib.sha256(mac_str.encode()).hexdigest()
        return token

    async def verify_token(self, x_token: str = Header(...)):
        """
        验证token的依赖函数

        Args:
            x_token (str): 请求头中的token

        Returns:
            str: 验证通过的token

        Raises:
            HTTPException: token无效时抛出401错误
        """
        if x_token != self.token:
            raise HTTPException(status_code=401, detail="无效的Token")
        return x_token

    def init_trader(self):
        """初始化交易对象"""
        self.trader, self.account = create_trader(self.account_id, self.mini_qmt_path)

    def convert_to_dict(self, obj) -> Dict[str, Any]:
        """
        将结果转换为可序列化的字典

        Args:
            obj: 任意对象

        Returns:
            Dict[str, Any]: 可序列化的字典
        """
        # 如果是基本类型,直接返回
        if isinstance(obj, (int, float, str, bool)):
            return obj
        # 如果已经是字典类型,直接返回
        elif isinstance(obj, dict):
            return obj
        # 如果是列表或元组,递归转换每个元素
        elif isinstance(obj, (list, tuple)):
            return [self.convert_to_dict(item) for item in obj]
        # 如果是自定义对象,获取所有公开属性
        elif hasattr(obj, '__dir__'):
            attrs = obj.__dir__()
            # 过滤掉内部属性和方法
            public_attrs = {attr: getattr(obj, attr)
                           for attr in attrs
                           if not attr.startswith('_') and not callable(getattr(obj, attr))}
            return public_attrs
        # 其他类型直接返回
        return str(obj)  # 将无法处理的类型转换为字符串

    def convert_method_to_endpoint(self, method_name: str, method):
        """
        将 XtQuantTrader 方法转换为 FastAPI 端点

        Args:
            method_name (str): 方法名称
            method: 方法对象
        """
        sig = inspect.signature(method)
        param_names = list(sig.parameters.keys())

        # 创建动态的请求模型
        class_fields = {
            '__annotations__': {}  # 添加类型注解字典
        }

        for param in param_names:
            if param in ['self', 'account']:
                continue
            class_fields['__annotations__'][param] = Any
            class_fields[param] = None

        RequestModel = type(f'{method_name}Request', (BaseModel,), class_fields)

        async def endpoint(request: RequestModel, token: str = Depends(self.verify_token)):
            try:
                params = request.dict(exclude_unset=True)
                if 'account' in param_names:
                    params['account'] = self.account
                result = getattr(self.trader, method_name)(**params)
                converted_result = self.convert_to_dict(result)
                return {'success': True, 'data': converted_result}
            except Exception as e:
                raise HTTPException(status_code=500, detail=str(e))

        self.app.post(f'/api/{method_name}')(endpoint)

    def setup_routes(self):
        """设置所有路由"""
        trader_methods = inspect.getmembers(
            self.trader.__class__,
            predicate=lambda x: inspect.isfunction(x) or inspect.ismethod(x)
        )

        excluded_methods = {'__init__', '__del__', 'register_callback', 'start', 'stop',
                          'connect', 'sleep', 'run_forever', 'set_relaxed_response_order_enabled'}

        for method_name, method in trader_methods:
            if not method_name.startswith('_') and method_name not in excluded_methods:
                self.convert_method_to_endpoint(method_name, method)

    def start(self):
        """启动服务器"""
        self.init_trader()
        self.setup_routes()
        uvicorn.run(self.app, host=self.host, port=self.port)

__init__(account_id, mini_qmt_path, host='0.0.0.0', port=8000, token=None)

初始化交易服务器

参数:

名称 类型 描述 默认
account_id str

账户ID

必需
mini_qmt_path str

miniQMT路径

必需
host str

服务器地址,默认0.0.0.0

'0.0.0.0'
port int

服务器端口,默认8000

8000
token Optional[str]

可选的自定义token

None
源代码位于: qka/brokers/server.py
def __init__(self, account_id: str, mini_qmt_path: str, host: str = "0.0.0.0", port: int = 8000, token: Optional[str] = None):
    """
    初始化交易服务器

    Args:
        account_id (str): 账户ID
        mini_qmt_path (str): miniQMT路径
        host (str): 服务器地址,默认0.0.0.0
        port (int): 服务器端口,默认8000
        token (Optional[str]): 可选的自定义token
    """
    self.account_id = account_id
    self.mini_qmt_path = mini_qmt_path
    self.host = host
    self.port = port
    self.app = FastAPI()
    self.trader = None
    self.account = None
    self.token = token if token else self.generate_token()  # 使用自定义token或生成固定token
    print(f"\n授权Token: {self.token}\n")  # 打印token供客户端使用

generate_token()

生成基于机器码的固定token

返回:

名称 类型 描述
str str

生成的token字符串

源代码位于: qka/brokers/server.py
def generate_token(self) -> str:
    """
    生成基于机器码的固定token

    Returns:
        str: 生成的token字符串
    """
    # 获取机器码(例如MAC地址)
    mac = uuid.getnode()
    # 将机器码转换为字符串
    mac_str = str(mac)
    # 使用SHA256哈希生成固定长度的token
    token = hashlib.sha256(mac_str.encode()).hexdigest()
    return token

verify_token(x_token=Header(...)) async

验证token的依赖函数

参数:

名称 类型 描述 默认
x_token str

请求头中的token

Header(...)

返回:

名称 类型 描述
str

验证通过的token

引发:

类型 描述
HTTPException

token无效时抛出401错误

源代码位于: qka/brokers/server.py
async def verify_token(self, x_token: str = Header(...)):
    """
    验证token的依赖函数

    Args:
        x_token (str): 请求头中的token

    Returns:
        str: 验证通过的token

    Raises:
        HTTPException: token无效时抛出401错误
    """
    if x_token != self.token:
        raise HTTPException(status_code=401, detail="无效的Token")
    return x_token

init_trader()

初始化交易对象

源代码位于: qka/brokers/server.py
def init_trader(self):
    """初始化交易对象"""
    self.trader, self.account = create_trader(self.account_id, self.mini_qmt_path)

convert_to_dict(obj)

将结果转换为可序列化的字典

参数:

名称 类型 描述 默认
obj

任意对象

必需

返回:

类型 描述
Dict[str, Any]

Dict[str, Any]: 可序列化的字典

源代码位于: qka/brokers/server.py
def convert_to_dict(self, obj) -> Dict[str, Any]:
    """
    将结果转换为可序列化的字典

    Args:
        obj: 任意对象

    Returns:
        Dict[str, Any]: 可序列化的字典
    """
    # 如果是基本类型,直接返回
    if isinstance(obj, (int, float, str, bool)):
        return obj
    # 如果已经是字典类型,直接返回
    elif isinstance(obj, dict):
        return obj
    # 如果是列表或元组,递归转换每个元素
    elif isinstance(obj, (list, tuple)):
        return [self.convert_to_dict(item) for item in obj]
    # 如果是自定义对象,获取所有公开属性
    elif hasattr(obj, '__dir__'):
        attrs = obj.__dir__()
        # 过滤掉内部属性和方法
        public_attrs = {attr: getattr(obj, attr)
                       for attr in attrs
                       if not attr.startswith('_') and not callable(getattr(obj, attr))}
        return public_attrs
    # 其他类型直接返回
    return str(obj)  # 将无法处理的类型转换为字符串

convert_method_to_endpoint(method_name, method)

将 XtQuantTrader 方法转换为 FastAPI 端点

参数:

名称 类型 描述 默认
method_name str

方法名称

必需
method

方法对象

必需
源代码位于: qka/brokers/server.py
def convert_method_to_endpoint(self, method_name: str, method):
    """
    将 XtQuantTrader 方法转换为 FastAPI 端点

    Args:
        method_name (str): 方法名称
        method: 方法对象
    """
    sig = inspect.signature(method)
    param_names = list(sig.parameters.keys())

    # 创建动态的请求模型
    class_fields = {
        '__annotations__': {}  # 添加类型注解字典
    }

    for param in param_names:
        if param in ['self', 'account']:
            continue
        class_fields['__annotations__'][param] = Any
        class_fields[param] = None

    RequestModel = type(f'{method_name}Request', (BaseModel,), class_fields)

    async def endpoint(request: RequestModel, token: str = Depends(self.verify_token)):
        try:
            params = request.dict(exclude_unset=True)
            if 'account' in param_names:
                params['account'] = self.account
            result = getattr(self.trader, method_name)(**params)
            converted_result = self.convert_to_dict(result)
            return {'success': True, 'data': converted_result}
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))

    self.app.post(f'/api/{method_name}')(endpoint)

setup_routes()

设置所有路由

源代码位于: qka/brokers/server.py
def setup_routes(self):
    """设置所有路由"""
    trader_methods = inspect.getmembers(
        self.trader.__class__,
        predicate=lambda x: inspect.isfunction(x) or inspect.ismethod(x)
    )

    excluded_methods = {'__init__', '__del__', 'register_callback', 'start', 'stop',
                      'connect', 'sleep', 'run_forever', 'set_relaxed_response_order_enabled'}

    for method_name, method in trader_methods:
        if not method_name.startswith('_') and method_name not in excluded_methods:
            self.convert_method_to_endpoint(method_name, method)

start()

启动服务器

源代码位于: qka/brokers/server.py
def start(self):
    """启动服务器"""
    self.init_trader()
    self.setup_routes()
    uvicorn.run(self.app, host=self.host, port=self.port)

使用示例

from qka.brokers.server import QMTServer

# 创建交易服务器
server = QMTServer(
    account_id="YOUR_ACCOUNT_ID",
    mini_qmt_path="YOUR_QMT_PATH"
)

# 启动服务器
server.start()

qka.brokers.trade

交易执行相关类和函数,包含订单、交易和持仓管理。

OrderSide

Bases: Enum

订单方向

Source code in qka/brokers/trade.py
class OrderSide(Enum):
    """订单方向"""
    BUY = "buy"
    SELL = "sell"

OrderType

Bases: Enum

订单类型

Source code in qka/brokers/trade.py
class OrderType(Enum):
    """订单类型"""
    MARKET = "market"
    LIMIT = "limit"
    STOP = "stop"
    STOP_LIMIT = "stop_limit"

OrderStatus

Bases: Enum

订单状态

Source code in qka/brokers/trade.py
class OrderStatus(Enum):
    """订单状态"""
    PENDING = "pending"
    FILLED = "filled"
    CANCELLED = "cancelled"
    REJECTED = "rejected"

Order

订单对象

Source code in qka/brokers/trade.py
class Order:
    """订单对象"""

    def __init__(self, symbol: str, side: str, quantity: int, 
                 order_type: str = "market", price: Optional[float] = None,
                 order_id: Optional[str] = None):
        """
        初始化订单

        Args:
            symbol: 股票代码
            side: 买卖方向 ("buy" 或 "sell")
            quantity: 数量
            order_type: 订单类型
            price: 价格(限价单需要)
            order_id: 订单ID
        """
        self.symbol = symbol
        self.side = side
        self.quantity = quantity
        self.order_type = order_type
        self.price = price
        self.order_id = order_id or self._generate_order_id()
        self.status = OrderStatus.PENDING.value
        self.created_time = datetime.now()
        self.filled_quantity = 0
        self.remaining_quantity = quantity

    def _generate_order_id(self) -> str:
        """生成订单ID"""
        import uuid
        return str(uuid.uuid4())[:8]

__init__(symbol, side, quantity, order_type='market', price=None, order_id=None)

初始化订单

参数:

名称 类型 描述 默认
symbol str

股票代码

必需
side str

买卖方向 ("buy" 或 "sell")

必需
quantity int

数量

必需
order_type str

订单类型

'market'
price Optional[float]

价格(限价单需要)

None
order_id Optional[str]

订单ID

None
源代码位于: qka/brokers/trade.py
def __init__(self, symbol: str, side: str, quantity: int, 
             order_type: str = "market", price: Optional[float] = None,
             order_id: Optional[str] = None):
    """
    初始化订单

    Args:
        symbol: 股票代码
        side: 买卖方向 ("buy" 或 "sell")
        quantity: 数量
        order_type: 订单类型
        price: 价格(限价单需要)
        order_id: 订单ID
    """
    self.symbol = symbol
    self.side = side
    self.quantity = quantity
    self.order_type = order_type
    self.price = price
    self.order_id = order_id or self._generate_order_id()
    self.status = OrderStatus.PENDING.value
    self.created_time = datetime.now()
    self.filled_quantity = 0
    self.remaining_quantity = quantity

Trade

交易记录

Source code in qka/brokers/trade.py
class Trade:
    """交易记录"""

    def __init__(self, order_id: str, symbol: str, side: str, 
                 quantity: int, price: float, commission: float = 0.0):
        """
        初始化交易记录

        Args:
            order_id: 订单ID
            symbol: 股票代码
            side: 买卖方向
            quantity: 成交数量
            price: 成交价格
            commission: 手续费
        """
        self.order_id = order_id
        self.symbol = symbol
        self.side = side
        self.quantity = quantity
        self.price = price
        self.commission = commission
        self.trade_time = datetime.now()
        self.trade_value = quantity * price

__init__(order_id, symbol, side, quantity, price, commission=0.0)

初始化交易记录

参数:

名称 类型 描述 默认
order_id str

订单ID

必需
symbol str

股票代码

必需
side str

买卖方向

必需
quantity int

成交数量

必需
price float

成交价格

必需
commission float

手续费

0.0
源代码位于: qka/brokers/trade.py
def __init__(self, order_id: str, symbol: str, side: str, 
             quantity: int, price: float, commission: float = 0.0):
    """
    初始化交易记录

    Args:
        order_id: 订单ID
        symbol: 股票代码
        side: 买卖方向
        quantity: 成交数量
        price: 成交价格
        commission: 手续费
    """
    self.order_id = order_id
    self.symbol = symbol
    self.side = side
    self.quantity = quantity
    self.price = price
    self.commission = commission
    self.trade_time = datetime.now()
    self.trade_value = quantity * price

Position

持仓信息

Source code in qka/brokers/trade.py
class Position:
    """持仓信息"""

    def __init__(self, symbol: str, quantity: int = 0, avg_price: float = 0.0):
        """
        初始化持仓

        Args:
            symbol: 股票代码
            quantity: 持仓数量
            avg_price: 平均持仓价格
        """
        self.symbol = symbol
        self.quantity = quantity
        self.avg_price = avg_price
        self.market_value = 0.0
        self.unrealized_pnl = 0.0
        self.realized_pnl = 0.0

    def update_market_price(self, current_price: float):
        """更新市场价格和盈亏"""
        self.market_value = self.quantity * current_price
        if self.quantity > 0:
            self.unrealized_pnl = (current_price - self.avg_price) * self.quantity

__init__(symbol, quantity=0, avg_price=0.0)

初始化持仓

参数:

名称 类型 描述 默认
symbol str

股票代码

必需
quantity int

持仓数量

0
avg_price float

平均持仓价格

0.0
源代码位于: qka/brokers/trade.py
def __init__(self, symbol: str, quantity: int = 0, avg_price: float = 0.0):
    """
    初始化持仓

    Args:
        symbol: 股票代码
        quantity: 持仓数量
        avg_price: 平均持仓价格
    """
    self.symbol = symbol
    self.quantity = quantity
    self.avg_price = avg_price
    self.market_value = 0.0
    self.unrealized_pnl = 0.0
    self.realized_pnl = 0.0

update_market_price(current_price)

更新市场价格和盈亏

源代码位于: qka/brokers/trade.py
def update_market_price(self, current_price: float):
    """更新市场价格和盈亏"""
    self.market_value = self.quantity * current_price
    if self.quantity > 0:
        self.unrealized_pnl = (current_price - self.avg_price) * self.quantity

MyXtQuantTraderCallback

Bases: XtQuantTraderCallback

Source code in qka/brokers/trade.py
class MyXtQuantTraderCallback(XtQuantTraderCallback):
    def on_disconnected(self):
        """
        连接状态回调
        :return:
        """
        print("connection lost")
    def on_stock_order(self, order):
        """
        委托信息推送
        :param order: XtOrder对象
        :return:
        """
        # 委托
        if order.order_status == 50:
            logger.info(f"{BLUE}【已委托】{RESET} {parse_order_type(order.order_type)} 代码:{order.stock_code} 名称:{order.order_remark} 委托价格:{order.price:.2f} 委托数量:{order.order_volume} 订单编号:{order.order_id} 委托时间:{timestamp_to_datetime_string(convert_to_current_date(order.order_time))}")
        elif order.order_status == 53 or order.order_status == 54:
            logger.warning(f"{YELLOW}【已撤单】{RESET} {parse_order_type(order.order_type)} 代码:{order.stock_code} 名称:{order.order_remark} 委托价格:{order.price:.2f} 委托数量:{order.order_volume} 订单编号:{order.order_id} 委托时间:{timestamp_to_datetime_string(convert_to_current_date(order.order_time))}")

    def on_stock_trade(self, trade):
        """
        成交信息推送
        :param trade: XtTrade对象
        :return:
        """
        logger.info(f"{GREEN}【已成交】{RESET} {parse_order_type(trade.order_type)} 代码:{trade.stock_code} 名称:{trade.order_remark} 成交价格:{trade.traded_price:.2f} 成交数量:{trade.traded_volume} 成交编号:{trade.order_id} 成交时间:{timestamp_to_datetime_string(convert_to_current_date(trade.traded_time))}")

    def on_order_error(self, data):
        if data.order_id in error_orders:
            return
        error_orders.append(data.order_id)
        logger.error(f"{RED}【委托失败】{RESET}错误信息:{data.error_msg.strip()}")

    def on_cancel_error(self, data):
        if data.order_id in error_orders:
            return
        error_orders.append(data.order_id)
        logger.error(f"{RED}【撤单失败】{RESET}错误信息:{data.error_msg.strip()}")

on_disconnected()

连接状态回调 :return:

源代码位于: qka/brokers/trade.py
def on_disconnected(self):
    """
    连接状态回调
    :return:
    """
    print("connection lost")

on_stock_order(order)

委托信息推送 :param order: XtOrder对象 :return:

源代码位于: qka/brokers/trade.py
def on_stock_order(self, order):
    """
    委托信息推送
    :param order: XtOrder对象
    :return:
    """
    # 委托
    if order.order_status == 50:
        logger.info(f"{BLUE}【已委托】{RESET} {parse_order_type(order.order_type)} 代码:{order.stock_code} 名称:{order.order_remark} 委托价格:{order.price:.2f} 委托数量:{order.order_volume} 订单编号:{order.order_id} 委托时间:{timestamp_to_datetime_string(convert_to_current_date(order.order_time))}")
    elif order.order_status == 53 or order.order_status == 54:
        logger.warning(f"{YELLOW}【已撤单】{RESET} {parse_order_type(order.order_type)} 代码:{order.stock_code} 名称:{order.order_remark} 委托价格:{order.price:.2f} 委托数量:{order.order_volume} 订单编号:{order.order_id} 委托时间:{timestamp_to_datetime_string(convert_to_current_date(order.order_time))}")

on_stock_trade(trade)

成交信息推送 :param trade: XtTrade对象 :return:

源代码位于: qka/brokers/trade.py
def on_stock_trade(self, trade):
    """
    成交信息推送
    :param trade: XtTrade对象
    :return:
    """
    logger.info(f"{GREEN}【已成交】{RESET} {parse_order_type(trade.order_type)} 代码:{trade.stock_code} 名称:{trade.order_remark} 成交价格:{trade.traded_price:.2f} 成交数量:{trade.traded_volume} 成交编号:{trade.order_id} 成交时间:{timestamp_to_datetime_string(convert_to_current_date(trade.traded_time))}")

主要组件

Order 类

订单对象,表示一个交易订单。

Trade 类

交易记录,表示一个已成交的交易。

Position 类

持仓信息,表示一个持仓头寸。

create_trader 函数

创建 QMT 交易对象的便捷函数。

使用示例

from qka.brokers.trade import create_trader, Order

# 创建交易对象
trader, account = create_trader(account_id, mini_qmt_path)

# 创建订单
order = Order(
    symbol='000001.SZ',
    side='buy',
    quantity=1000,
    order_type='market'
)

模块导入方式

交易模块需要从子模块导入:

# 客户端
from qka.brokers.client import QMTClient

# 服务器
from qka.brokers.server import QMTServer

# 交易执行
from qka.brokers.trade import create_trader, Order, Trade, Position

工作流程

1. 启动交易服务器

from qka.brokers.server import QMTServer

server = QMTServer(
    account_id="123456789",
    mini_qmt_path="D:/qmt"
)
server.start()  # 会打印token供客户端使用

2. 使用交易客户端

from qka.brokers.client import QMTClient

client = QMTClient(
    base_url="http://localhost:8000",
    token="服务器打印的token"
)

# 查询账户信息
assets = client.api("query_stock_asset")

# 下单交易
from xtquant import xtconstant
result = client.api(
    "order_stock",
    stock_code='600000.SH',
    order_type=xtconstant.STOCK_BUY,
    order_volume=1000,
    price_type=xtconstant.FIX_PRICE,
    price=10.5
)

注意事项

  1. QMT 依赖: 需要安装 QMT 并正确配置环境
  2. 网络连接: 确保服务器和客户端网络连通
  3. 权限验证: 使用 token 进行身份验证
  4. 错误处理: 妥善处理网络错误和交易失败

相关链接