Skip to content

生产者/消费者模式

freqtrade提供了一种机制,其中一个实例(也称为consumer)可以通过消息Websocket从上游freqtrade实例(也称为producer)监听消息。主要有analyzed_dfwhitelist消息。这样可以在多个机器人之间重复使用计算出的指标(和信号)对不同交易对进行计算,而无需多次计算。

在Rest API文档中查看 消息Websocket 获取有关设置api_server配置以用于消息Websocket的信息(这将是您的producer)。

注意

我们强烈建议设置ws_token为随机的并且只有您自己知道,以防止未经授权的访问您的机器人。

配置

通过将 external_message_consumer 部分添加到consumer的配置文件中来启用订阅功能。

{
    //...
   "external_message_consumer": {
        "enabled": true,
        "producers": [
            {
                "name": "default", // 这可以是您喜欢的任何名称,默认值为"default"
                "host": "127.0.0.1", // 属于您的producer的api_server配置中的主机
                "port": 8080, // 属于您的producer的api_server配置中的端口
                "secure": false, // 使用安全的WebSocket连接,默认值为false
                "ws_token": "sercet_Ws_t0ken" // 属于您的producer的api_server配置中的ws_token
            }
        ],
        //以下配置是可选的,通常不需要
        // "wait_timeout": 300,
        // "ping_timeout": 10,
        // "sleep_time": 10,
        // "remove_entry_exit_signals": false,
        // "message_size_limit": 8
    }
    //...
}
参数 描述
enabled 必填项。 启用消费者模式。如果设置为false,则忽略此部分中的所有其他设置。
默认值为 false.
数据类型: boolean .
producers 必填项。 producer列表
数据类型: Array.
producers.name 必填项。 此producer的名称。如果使用多个producer,则必须在对get_producer_pairs()get_producer_df() 的调用中使用此名称。
数据类型: string
producers.host 必填项。 您的producer的主机名或IP地址。
数据类型: string
producers.port 必填项。 上述主机的端口。
默认值为 8080.
数据类型: Integer
producers.secure 可选项。 使用安全的Websockets连接,请使用ssl。默认为False。
数据类型: string
producers.ws_token 必填项。 配置在producer上的ws_token
数据类型: string
可选设置
wait_timeout 如果未收到消息,则在再次ping之前的超时时间。
默认值为 300.
数据类型: Integer - 以秒为单位。
ping_timeout ping超时时间
默认值为 10.
数据类型: Integer - 以秒为单位。
sleep_time 重新连接之前的睡眠时间。
默认值为 10.
数据类型: Integer - 以秒为单位。
remove_entry_exit_signals 在接收到dataframe时,将信号列从dataframe中删除(设置为0)。
默认为 false.
数据类型: Boolean.
message_size_limit 每条消息的大小限制
默认值为 8.
数据类型: Integer - 兆字节。

populate_indicators()中计算指标的时候(或者作为populate_indicators()的替代),follower实例会监听与producer实例的消息连接(或者在高级配置中监听与多个producer实例的连接),并请求producer最新分析的数据框架以获取活跃的白名单中每个交易对的数据。

然后,消费者实例将拥有完整的分析数据框架副本,无需自行计算它们。## 示例

示例 - 生产者策略

一个带有多个指标的简单策略。策略本身不需要特殊考虑。

class ProducerStrategy(IStrategy):
    #...
    def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        根据频率交易的标准方式计算指标,可以将这些指标广播给其他实例
        """
        dataframe['rsi'] = ta.RSI(dataframe)
        bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
        dataframe['bb_lowerband'] = bollinger['lower']
        dataframe['bb_middleband'] = bollinger['mid']
        dataframe['bb_upperband'] = bollinger['upper']
        dataframe['tema'] = ta.TEMA(dataframe, timeperiod=9)

        return dataframe

    def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        为给定的数据框填充入场信号
        """
        dataframe.loc[
            (
                (qtpylib.crossed_above(dataframe['rsi'], self.buy_rsi.value)) &
                (dataframe['tema'] <= dataframe['bb_middleband']) &
                (dataframe['tema'] > dataframe['tema'].shift(1)) &
                (dataframe['volume'] > 0)
            ),
            'enter_long'] = 1

        return dataframe

FreqAI

您可以在一台强大的机器上设置 FreqAI,同时在像树莓派这样的简单的机器上运行消费者,可以以不同的方式解释生产者生成的信号。

示例 - 消费者策略

逻辑上等价的策略,它本身不计算任何指标,但可以根据生产者计算的指标来进行交易决策。在此示例中,消费者具有相同的入场条件,但这并非必须。消费者可能使用不同的逻辑进入/退出交易,并且仅使用指定的指标。

class ConsumerStrategy(IStrategy):
    #...
    process_only_new_candles = False # 消费者需要的标志

    _columns_to_expect = ['rsi_default', 'tema_default', 'bb_middleband_default']

    def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        使用websocket api从另一个freqtrade实例获取预置指标。
        使用 `self.dp.get_producer_df(pair)` 获取数据框
        """
        pair = metadata['pair']
        timeframe = self.timeframe

        producer_pairs = self.dp.get_producer_pairs()
        # 你可以通过以下方式指定要获取数据对的生产者:
        # self.dp.get_producer_pairs("my_other_producer")

        # 此函数返回被分析的数据框以及分析的时间
        producer_dataframe, _ = self.dp.get_producer_df(pair)
        # 如果生产者提供了其他数据,你可以获取它:
        # self.dp.get_producer_df(
        #   pair,
        #   timeframe="1h",
        #   candle_type=CandleType.SPOT,
        #   producer_name="my_other_producer"
        # )

        if not producer_dataframe.empty:
            # 如果你打算直接传递生产者的进入/退出信号,
            # 请指定 ffill=False,否则会产生意外的结果
            merged_dataframe = merge_informative_pair(dataframe, producer_dataframe,
                                                      timeframe, timeframe,
                                                      append_timeframe=False,
                                                      suffix="default")
            return merged_dataframe
        else:
            dataframe[self._columns_to_expect] = 0

        return dataframe

    def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        为给定的数据框填充进入信号
        """
        # 使用数据框的列,就好像我们自己计算了它们一样
        dataframe.loc[
            (
                (qtpylib.crossed_above(dataframe['rsi_default'], self.buy_rsi.value)) &
                (dataframe['tema_default'] <= dataframe['bb_middleband_default']) &
                (dataframe['tema_default'] > dataframe['tema_default'].shift(1)) &
                (dataframe['volume'] > 0)
            ),
            'enter_long'] = 1

        return dataframe

使用上游信号

通过设置 remove_entry_exit_signals=false,你也可以直接使用生产者的信号。它们应该可用作 enter_long_default(假设使用了 suffix="default") - 可以直接用作信号,或作为附加指标使用。