Worker 常用 6 种结构与适用场景

Worker 常用 6 种结构与适用场景

6 种 Worker 通信模式(附 Sync / Async 双版本骨架)

Worker ≈"干活的循环"

内部逻辑千差万别:查库、算指标、推流......

但绝大多数差异对外是黑盒,"业务怎么干"对调用方并不重要。

真正影响系统架构的是"它怎么被喂活、怎么回结果"

• 背压?

• 顺序?

• 要不要回包?

• 一条消息给几个人?

这些都体现在 通信语义 上,而通信语义一旦选错,全局都会出 Bug 。

所以先把"怎么收/发消息"定死,再去填"干什么活"最稳妥。

6 种模式之分,本质是

"一个队列 + 若干消费者"对外提供的契约不同:

记住一件事:

模式 =「消息通路 + 并发语义」 。

至于 Async 还是 Sync ,只是把

send()/recv() 换成 await send().await/recv().await 而已,

精髓不会变。

下面每节都包含:

什么时候选这种模式(Essence)

Async(Tokio)骨架

Sync(纯 std / crossbeam-channel)骨架

① Actor-Handle

Fire-and-Forget,保持顺序

Essence

单消费者,FIFO,不关心返回值;常用于日志、Webhook、顺序写库。

Async -- Tokio

rust

复制代码

use tokio::sync::mpsc; // 无界 => 永不阻塞

type Tx = mpsc::UnboundedSender;

pub fn spawn_actor() -> Tx {

let (tx, mut rx) = mpsc::unbounded_channel();

tokio::spawn(async move {

while let Some(t) = rx.recv().await {

handle(t).await; // 顺序执行

}

});

tx

}

Sync -- std

rust

复制代码

use std::{sync::mpsc, thread};

type Tx = mpsc::Sender;

pub fn spawn_actor() -> Tx {

let (tx, rx) = mpsc::channel();

thread::spawn(move || {

for t in rx { handle(t); } // 同样顺序、阻塞版

});

tx

}

② Pipe-Stream

流水线 + 背压

Essence

多阶段加工,每阶段都有有界队列;队列满→上游阻塞,实现背压。

Async -- Tokio

rust

复制代码

use tokio::sync::mpsc;

let (tx_a, mut rx_a) = mpsc::channel(1024); // stage A→B

let (tx_b, mut rx_b) = mpsc::channel(1024); // stage B→C

tokio::spawn(async move { // A

while let Some(raw) = src.recv().await {

tx_a.send(parse(raw)).await.unwrap();

}

});

tokio::spawn(async move { // B

while let Some(mid) = rx_a.recv().await {

tx_b.send(enrich(mid)).await.unwrap();

}

});

tokio::spawn(async move { // C

while let Some(fin) = rx_b.recv().await {

sink(fin).await;

}

});

Sync -- std

rust

复制代码

use std::{sync::mpsc, thread};

let (tx_a, rx_a) = mpsc::sync_channel(1024);

let (tx_b, rx_b) = mpsc::sync_channel(1024);

thread::spawn(move || for raw in src { tx_a.send(parse(raw)).unwrap(); });

thread::spawn(move || for mid in rx_a { tx_b.send(enrich(mid)).unwrap(); });

thread::spawn(move || for fin in rx_b { sink(fin); });

③ Call-Reply (RPC Actor)

要回包 / 错误

Essence

请求里夹一个 oneshot(单次返回通道);调用方同步/异步等待结果。

Async -- Tokio

rust

复制代码

use tokio::{sync::{mpsc, oneshot}};

struct Req {

data: MyReq,

resp: oneshot::Sender>,

}

let (tx, mut rx) = mpsc::channel::(128);

tokio::spawn(async move {

while let Some(req) = rx.recv().await {

let r = do_work(req.data).await;

let _ = req.resp.send(r);

}

});

pub async fn call(tx: &mpsc::Sender, d: MyReq) -> Result {

let (tx1, rx1) = oneshot::channel();

tx.send(Req { data: d, resp: tx1 }).await?;

rx1.await?

}

Sync -- std

rust

复制代码

use std::sync::mpsc::{channel, Sender};

struct Req {

data: MyReq,

resp: Sender>,

}

let (tx, rx) = channel::();

std::thread::spawn(move || {

for req in rx {

let r = do_work(req.data);

let _ = req.resp.send(r);

}

});

fn call(tx: &Sender, d: MyReq) -> Result {

let (tx1, rx1) = channel();

tx.send(Req { data: d, resp: tx1 }).unwrap();

rx1.recv().unwrap()

}

④ Worker-Pool

可并行的独立任务

Essence

多线程/多 task 共享同一有界队列;并行提升吞吐。

Async -- Tokio (spawn_blocking 也类似)

rust

复制代码

use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel::(1000);

for _ in 0..num_cpus::get() {

let mut rx = rx.clone();

tokio::spawn(async move {

while let Some(j) = rx.recv().await {

cpu_heavy(j).await;

}

});

}

Sync -- std

rust

复制代码

use std::{sync::mpsc, thread};

let (tx, rx) = mpsc::sync_channel::(1000);

for _ in 0..num_cpus::get() {

let rx = rx.clone();

thread::spawn(move || for j in rx { cpu_heavy(j) });

}

⑤ Broadcast / Pub-Sub

一条消息多份

Essence

生产者写一次,订阅者各拿一份;适合配置热更新、行情分发。

Async -- Tokio

rust

复制代码

use tokio::sync::broadcast;

let (tx, _) = broadcast::channel::(16);

let mut sub1 = tx.subscribe();

tokio::spawn(async move { while let Ok(e) = sub1.recv().await { handle1(e).await } });

let mut sub2 = tx.subscribe();

tokio::spawn(async move { while let Ok(e) = sub2.recv().await { handle2(e).await } });

tx.send(Event::Tick)?;

Sync -- crossbeam-channel

rust

复制代码

use crossbeam_channel::unbounded;

let (tx, rx) = unbounded::(); // `Receiver` 可 `clone`

let sub1 = rx.clone();

let sub2 = rx.clone();

std::thread::spawn(move || for e in sub1 { handle1(e) });

std::thread::spawn(move || for e in sub2 { handle2(e) });

tx.send(Event::Tick).unwrap();

⑥ Cron / Interval

按时间触发

Essence

无输入队列;固定周期触发任务,可带重试/监控。

Async -- Tokio

rust

复制代码

tokio::spawn(async move {

let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60));

loop {

ticker.tick().await;

if let Err(e) = job().await { log::error!("cron failed: {e}") }

}

});

Sync -- std

rust

复制代码

std::thread::spawn(move || {

loop {

std::thread::sleep(std::time::Duration::from_secs(60));

if let Err(e) = job() { eprintln!("cron failed: {e}") }

}

});

怎样选?

只写、不回、要顺序 → Actor-Handle

多阶段流水 & 背压 → Pipe-Stream

要结果 / 错误回传 → Call-Reply

可并行、CPU 密集 → Worker-Pool

一条消息多订阅者 → Broadcast

按时间触发 → Cron

Pomelo_刘金 无论 Async 还是 Sync,都只是换了通道与等待方式 。

理解「队列类型 + 并发语义 + 背压/顺序/返回值」这三要素,

就能在任何运行时下快速拼出正确的 Worker。

相关推荐

dnf短剑哪个好
365投注规则

dnf短剑哪个好

📅 07-09 👁️ 3363
超凡進化
365dni讲解

超凡進化

📅 07-02 👁️ 9830
世界杯策划书.doc
365dni讲解

世界杯策划书.doc

📅 10-04 👁️ 4575
什么是网站空间?有哪些类型?
365投注规则

什么是网站空间?有哪些类型?

📅 09-28 👁️ 2463
方舟生存进化红方舟是哪个地图
365dni讲解

方舟生存进化红方舟是哪个地图

📅 01-05 👁️ 9050
浙江山上有猛兽吗 浙江最凶猛的三个野兽
365投注规则

浙江山上有猛兽吗 浙江最凶猛的三个野兽

📅 12-07 👁️ 6580
抢红包神器哪个最好用?8款抢红包神器下载推荐
lol菠菜的app什么靠谱_S14英雄联盟全球总决赛电竞赛事竞猜
《我来自江湖》百木园怎么走 百木园走法指南
365投注规则

《我来自江湖》百木园怎么走 百木园走法指南

📅 09-26 👁️ 6451