技术栈:我们的老伙计
NestJS
,以及
ioredis
,
kafka-node
最近在研究kafka消息队列,所以想写个秒杀来试试手,看了好几篇博客都没有具体的项目示例,所以参考了一下各种实现用nestjs写了一个可运行的项目。
第一步,创建项目
这里使用了nest cli命令快速生成项目模板;
安装@nest/cli脚手架用于生成项目;
npm i -g @nest/cli
nest new nest-seckill
cd ./nest-seckill
yarn
yarn add -S redis ioredis nestjs-redis kafka-node mysql2 typeorm uuid-random
第二步,生成seckill模块
这里使用了nest cli命令快速生成模板代码;了解详情可以查看官方文档:nest-cli文档
生成 seckill.service.ts
;
在service
里使用redis乐观锁(watch)
与事务(mult)
实现秒杀逻辑,
再使用kafka的Producer
生产一条消费数据;
nest g service seckill
修改内容:
import { Injectable } from '@nestjs/common'
import { RedisService } from 'nestjs-redis'
@Injectable()
export class RedisClientService {
constructor(private readonly redisService: RedisService) {}
async getSeckillRedisClient() {
return await this.redisService.getClient('seckill')
第三步,编写秒杀逻辑;
定义秒杀接口:
在seckill.controller.ts
里新增一个Post接口:
import { Body, Controller, Post } from '@nestjs/common'
import * as uuid from 'uuid-random'
import { CreateOrderDTO } from '../order/order.dto'
import { SeckillService } from './seckill.service'
import { awaitWrap } from '@/utils/index'
@Controller('seckill')
export class SeckillController {
constructor(private readonly seckillService: SeckillService) {}
@Post('/add')
async addOrder(@Body() order: CreateOrderDTO) {
const params: CreateOrderDTO = {
...order,
openid: `${uuid()}-${new Date().valueOf()}`,
const [error, result] = await awaitWrap(this.seckillService.secKill(params))
return error || result
实现秒杀逻辑:
在seckill.service.ts
里新增一个secKill
方法;
使用redis乐观锁(watch)
和事务(mult)
,实现并发下修改数据,详情可参考node redis文档;
import { Injectable, Logger } from '@nestjs/common'
import * as kafka from 'kafka-node'
import * as Redis from 'ioredis'
import { RedisClientService } from '../redis/redis.service'
import { getConfig } from '@root/config/index'
import { awaitWrap } from '@/utils'
const { redisSeckill, kafkaConfig } = getConfig()
const kafkaClient = new kafka.KafkaClient({ kafkaHost: kafkaConfig.kafkaHost })
const producer = new kafka.Producer(kafkaClient, {
requireAcks: 1,
ackTimeoutMs: 100,
partitionerType: 2,
@Injectable()
export class SeckillService {
logger = new Logger('SeckillService')
seckillRedisClient!: Redis.Redis
count = 0
constructor(private readonly redisClientService: RedisClientService) {
this.redisClientService.getSeckillRedisClient().then(client => {
this.seckillRedisClient = client
* ***********************
* @desc 秒杀具体实现
* ***********************
async secKill(params) {
const { seckillCounterKey } = redisSeckill
this.logger.log(`当前请求count:${this.count++}`)
const [watchError] = await awaitWrap(this.seckillRedisClient.watch(seckillCounterKey))
watchError && this.logger.error(watchError)
if (watchError) return watchError
const [getError, reply] = await awaitWrap(this.seckillRedisClient.get(seckillCounterKey))
getError && this.logger.error(getError)
if (getError) return getError
if (parseInt(reply) <= 0) {
this.logger.warn('已经卖光了')
return '已经卖光了'
const [execError, replies] = await awaitWrap(this.seckillRedisClient.multi().decr(seckillCounterKey).exec())
execError && this.logger.error(execError)
if (execError) return execError
if (!replies) {
this.logger.warn('counter被使用')
this.secKill(params)
return
const payload = [
topic: kafkaConfig.topic,
partition: 0,
messages: [JSON.stringify(params)],
this.logger.log('生产数据payload:')
this.logger.verbose(payload)
return new Promise((resolve, reject) => {
producer.send(payload, (err, kafkaProducerResponse) => {
if (err) {
this.logger.error(err)
reject(err)
return err
this.logger.verbose(kafkaProducerResponse)
resolve({ payload, kafkaProducerResponse })
监听kafka消息,消费订单队列消息;
在seckill.module.ts
内新增handleListenerKafkaMessage()
方法,用于处理kafka消息;
同时需要在seckill
模块挂载(onApplicationBootstrap)
时调用此方法,开始订阅kafka消息;
import { Logger, Module, OnApplicationBootstrap } from '@nestjs/common'
import * as Redis from 'ioredis'
import { awaitWrap } from '@/utils'
import { CreateOrderDTO } from '../order/order.dto'
import { OrderModule } from '../order/order.module'
import { OrderService } from '../order/order.service'
import { RedisClientService } from '../redis/redis.service'
import { getKafkaConsumer } from './kafka-utils'
import { SeckillController } from './seckill.controller'
import { SeckillService } from './seckill.service'
import { getConfig } from '@root/config'
const { kafkaConfig } = getConfig()
@Module({
imports: [OrderModule],
providers: [RedisClientService, SeckillService],
controllers: [SeckillController],
export class SeckillModule implements OnApplicationBootstrap {
logger = new Logger('SeckillModule')
seckillRedisClient!: Redis.Redis
constructor(
private readonly orderService: OrderService,
private readonly seckillService: SeckillService,
private readonly redisClientService: RedisClientService
this.redisClientService.getSeckillRedisClient().then(client => {
this.seckillRedisClient = client
async handleListenerKafkaMessage() {
const kafkaConsumer = getKafkaConsumer()
kafkaConsumer.on('message', async message => {
this.logger.log('得到的生产者的数据为:')
this.logger.verbose(message)
let order!: CreateOrderDTO
if (typeof message.value === 'string') {
order = JSON.parse(message.value)
} else {
order = JSON.parse(message.value.toString())
const [err, order] = await awaitWrap(this.orderService.saveOne(value))
if (err) {
this.logger.error(err)
return
this.logger.log(`订单【${order.id}】信息已存入数据库`)
async onApplicationBootstrap() {
this.logger.log('onApplicationBootstrap: ')
await this.seckillService.initCount()
this.handleListenerKafkaMessage()
kafka消费者getKafkaConsumer
方法实现如下:
在seckill模块文件夹下新增kafka-utils.ts
文件:
import * as kafka from 'kafka-node'
import * as Redis from 'ioredis'
import { getConfig } from '@root/config/index'
import { awaitWrap } from '@/utils'
const { kafkaConfig } = getConfig()
let kafkaConsumer!: kafka.Consumer
function getKafkaClient() {
let kafkaClient!: kafka.KafkaClient
return () => {
if (!kafkaClient) {
kafkaClient = new kafka.KafkaClient({
kafkaHost: kafkaConfig.kafkaHost,
return kafkaClient
* @desc 获取消费者实例
export function getKafkaConsumer() {
const topics = [
topic: kafkaConfig.topic,
partition: 0,
offset: 0,
const options = {
autoCommit: true,
autoCommitIntervalMs: 5000,
fromOffset: false,
const kafkaClient = getKafkaClient()()
if (!kafkaConsumer) {
kafkaConsumer = new kafka.Consumer(kafkaClient, topics, options)
return kafkaConsumer
最后我们得到的文件结构大概是这样:
运行项目:
yarn dev
如果需要并发测试秒杀接口,可以使用postman
的runner
多开;简单测试接口逻辑的话,可以打开项目默认配置的swagger-ui
页面http://localhost:3000/api-docs
至此我们的主要秒杀逻辑就写的差不多了。由于我们主要为了实现秒杀逻辑,所有订单模块的代码就没有在这里展开了。我们只需要像第二步那样几行命令就可以简单创建Order模块,用于订单curd;
关于redis,mysql,kafka等服务的话可以编写docker-compose.yaml
快速启动起来,具体可以参考本项目代码;
kafka容器可能会由于centos的防火墙导致启动失败,解决办法是:先关闭宿主机防火墙再重启docker;
kafka容器创建后,需要我们在打开浏览器访问kafka-manager
容器映射的9000
端口上kafka管理页面,创建cluster和我们的Topic,具体初始化操作较为简单,可自行搜索kafka-manager
;