Node.js微服务架构设计与实现
•编程技术
Node.js微服务架构设计与实现
微服务架构是现代应用开发的主流模式。本文将介绍如何使用Node.js构建可扩展、高可用的微服务系统。
微服务架构概述
架构特点
- 服务独立性:每个服务独立开发、部署、扩展
- 技术多样性:不同服务可以使用不同技术栈
- 故障隔离:单个服务故障不影响整体系统
- 弹性扩展:根据负载独立扩展服务
服务拆分原则
按业务领域拆分:
├── 用户服务 (User Service)
├── 订单服务 (Order Service)
├── 支付服务 (Payment Service)
├── 库存服务 (Inventory Service)
└── 通知服务 (Notification Service)
服务通信
1. HTTP/REST API
// 用户服务
import express from 'express';
import axios from 'axios';
const app = express();
app.use(express.json());
// 服务发现配置
const SERVICES = {
ORDER_SERVICE: process.env.ORDER_SERVICE_URL || 'http://localhost:3002',
PAYMENT_SERVICE: process.env.PAYMENT_SERVICE_URL || 'http://localhost:3003'
};
// 创建用户
app.post('/users', async (req, res) => {
try {
const user = await createUser(req.body);
res.status(201).json(user);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// 获取用户订单(调用订单服务)
app.get('/users/:userId/orders', async (req, res) => {
try {
const { userId } = req.params;
// 验证用户存在
const user = await getUser(userId);
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
// 调用订单服务
const response = await axios.get(
`${SERVICES.ORDER_SERVICE}/orders?userId=${userId}`,
{
headers: {
'X-Request-ID': req.headers['x-request-id'] || generateRequestId(),
'X-User-ID': userId
},
timeout: 5000
}
);
res.json(response.data);
} catch (error) {
if (axios.isAxiosError(error)) {
if (error.code === 'ECONNABORTED') {
return res.status(504).json({ error: 'Order service timeout' });
}
if (error.response) {
return res.status(error.response.status).json(error.response.data);
}
}
res.status(500).json({ error: error.message });
}
});
// 断路器模式
class CircuitBreaker {
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
private failureCount = 0;
private failureThreshold = 5;
private timeout = 60000;
private nextAttempt = Date.now();
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
}
this.state = 'HALF_OPEN';
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess() {
this.failureCount = 0;
this.state = 'CLOSED';
}
private onFailure() {
this.failureCount++;
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.timeout;
}
}
}
const orderServiceBreaker = new CircuitBreaker();
// 使用断路器
app.get('/users/:userId/orders-safe', async (req, res) => {
try {
const orders = await orderServiceBreaker.execute(async () => {
const response = await axios.get(
`${SERVICES.ORDER_SERVICE}/orders?userId=${req.params.userId}`,
{ timeout: 5000 }
);
return response.data;
});
res.json(orders);
} catch (error) {
res.status(503).json({ error: 'Service temporarily unavailable' });
}
});
2. gRPC通信
// user.proto
syntax = "proto3";
package user;
service UserService {
rpc GetUser (GetUserRequest) returns (User);
rpc CreateUser (CreateUserRequest) returns (User);
rpc UpdateUser (UpdateUserRequest) returns (User);
rpc DeleteUser (DeleteUserRequest) returns (DeleteUserResponse);
}
message GetUserRequest {
string id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
string password = 3;
}
message UpdateUserRequest {
string id = 1;
string name = 2;
string email = 3;
}
message DeleteUserRequest {
string id = 1;
}
message User {
string id = 1;
string name = 2;
string email = 3;
string created_at = 4;
string updated_at = 5;
}
message DeleteUserResponse {
bool success = 1;
string message = 2;
}
// gRPC服务端
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
const PROTO_PATH = './user.proto';
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const userProto = grpc.loadPackageDefinition(packageDefinition).user;
const users = new Map();
const userService = {
getUser: (call, callback) => {
const user = users.get(call.request.id);
if (user) {
callback(null, user);
} else {
callback({
code: grpc.status.NOT_FOUND,
message: 'User not found'
});
}
},
createUser: (call, callback) => {
const id = generateId();
const user = {
id,
...call.request,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString()
};
users.set(id, user);
callback(null, user);
},
updateUser: (call, callback) => {
const user = users.get(call.request.id);
if (!user) {
callback({
code: grpc.status.NOT_FOUND,
message: 'User not found'
});
return;
}
const updatedUser = {
...user,
...call.request,
updated_at: new Date().toISOString()
};
users.set(call.request.id, updatedUser);
callback(null, updatedUser);
},
deleteUser: (call, callback) => {
const exists = users.has(call.request.id);
if (exists) {
users.delete(call.request.id);
callback(null, { success: true, message: 'User deleted' });
} else {
callback({
code: grpc.status.NOT_FOUND,
message: 'User not found'
});
}
}
};
function main() {
const server = new grpc.Server();
server.addService(userProto.UserService.service, userService);
server.bindAsync(
'0.0.0.0:50051',
grpc.ServerCredentials.createInsecure(),
(err, port) => {
if (err) {
console.error(err);
return;
}
console.log(`gRPC server running at port ${port}`);
server.start();
}
);
}
main();
3. 消息队列
// 使用RabbitMQ
import amqp from 'amqplib';
class MessageQueue {
private connection: amqp.Connection;
private channel: amqp.Channel;
async connect(url: string) {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
}
async publish(exchange: string, routingKey: string, message: any) {
await this.channel.assertExchange(exchange, 'topic', { durable: true });
this.channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
{ persistent: true }
);
}
async consume(queue: string, handler: (msg: any) => Promise<void>) {
await this.channel.assertQueue(queue, { durable: true });
this.channel.consume(queue, async (msg) => {
if (msg) {
try {
const content = JSON.parse(msg.content.toString());
await handler(content);
this.channel.ack(msg);
} catch (error) {
console.error('Message processing error:', error);
this.channel.nack(msg, false, true);
}
}
});
}
async close() {
await this.channel.close();
await this.connection.close();
}
}
// 使用示例
const mq = new MessageQueue();
// 订单服务发布事件
async function publishOrderCreated(order: any) {
await mq.publish('orders', 'order.created', {
orderId: order.id,
userId: order.userId,
amount: order.amount,
timestamp: new Date().toISOString()
});
}
// 通知服务订阅事件
async function startNotificationService() {
await mq.consume('notification-queue', async (message) => {
if (message.event === 'order.created') {
await sendOrderConfirmationEmail(message.userId, message.orderId);
}
});
}
服务发现
// 使用Consul进行服务发现
import Consul from 'consul';
class ServiceRegistry {
private consul: Consul;
private serviceId: string;
constructor() {
this.consul = new Consul({ host: 'localhost', port: 8500 });
}
async register(
name: string,
id: string,
port: number,
tags: string[] = []
) {
this.serviceId = id;
await this.consul.agent.service.register({
name,
id,
port,
tags,
check: {
http: `http://localhost:${port}/health`,
interval: '30s',
timeout: '10s'
}
});
console.log(`Service ${name} registered with ID ${id}`);
}
async deregister() {
if (this.serviceId) {
await this.consul.agent.service.deregister(this.serviceId);
console.log(`Service ${this.serviceId} deregistered`);
}
}
async discover(serviceName: string): Promise<string[]> {
const services = await this.consul.health.service(serviceName);
return services.map((s: any) =>
`http://${s.Service.Address}:${s.Service.Port}`
);
}
}
// 使用服务发现
const registry = new ServiceRegistry();
// 服务启动时注册
async function startService() {
const port = process.env.PORT || 3001;
await registry.register(
'user-service',
`user-service-${port}`,
Number(port),
['node', 'microservice']
);
// 优雅关闭
process.on('SIGINT', async () => {
await registry.deregister();
process.exit(0);
});
}
// 客户端使用服务发现
async function getServiceUrl(serviceName: string): Promise<string> {
const instances = await registry.discover(serviceName);
if (instances.length === 0) {
throw new Error(`No instances found for ${serviceName}`);
}
// 简单的负载均衡:随机选择
return instances[Math.floor(Math.random() * instances.length)];
}
配置管理
// 使用etcd进行配置管理
import { Etcd3 } from 'etcd3';
class ConfigManager {
private client: Etcd3;
private cache: Map<string, any> = new Map();
constructor(endpoints: string[]) {
this.client = new Etcd3({ endpoints });
}
async get(key: string, defaultValue?: any): Promise<any> {
// 先查缓存
if (this.cache.has(key)) {
return this.cache.get(key);
}
// 查etcd
const value = await this.client.get(key).json();
if (value !== null) {
this.cache.set(key, value);
return value;
}
return defaultValue;
}
async set(key: string, value: any): Promise<void> {
await this.client.put(key).value(JSON.stringify(value));
this.cache.set(key, value);
}
watch(key: string, callback: (value: any) => void): void {
const watcher = this.client.watch().key(key).create();
watcher.on('put', (res) => {
const value = JSON.parse(res.value.toString());
this.cache.set(key, value);
callback(value);
});
}
}
// 使用示例
const config = new ConfigManager(['http://localhost:2379']);
// 获取配置
const dbConfig = await config.get('database.config', {
host: 'localhost',
port: 5432,
database: 'myapp'
});
// 监听配置变化
config.watch('feature.flags', (flags) => {
console.log('Feature flags updated:', flags);
updateFeatureFlags(flags);
});
监控和日志
// 使用Prometheus进行监控
import promClient from 'prom-client';
// 创建指标
const httpRequestDuration = new promClient.Histogram({
name: 'http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['method', 'route', 'status_code'],
buckets: [0.1, 0.5, 1, 2, 5]
});
const httpRequestsTotal = new promClient.Counter({
name: 'http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['method', 'route', 'status_code']
});
const activeConnections = new promClient.Gauge({
name: 'active_connections',
help: 'Number of active connections'
});
// 注册指标
promClient.register.registerMetric(httpRequestDuration);
promClient.register.registerMetric(httpRequestsTotal);
promClient.register.registerMetric(activeConnections);
// 中间件
function metricsMiddleware(req, res, next) {
const start = Date.now();
res.on('finish', () => {
const duration = (Date.now() - start) / 1000;
const route = req.route ? req.route.path : req.path;
httpRequestDuration.observe(
{ method: req.method, route, status_code: res.statusCode },
duration
);
httpRequestsTotal.inc({
method: req.method,
route,
status_code: res.statusCode
});
});
next();
}
// 指标端点
app.get('/metrics', async (req, res) => {
res.set('Content-Type', promClient.register.contentType);
res.end(await promClient.register.metrics());
});
// 结构化日志
import winston from 'winston';
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
defaultMeta: { service: 'user-service' },
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' })
]
});
// 使用日志
logger.info('User created', { userId: '123', email: 'user@example.com' });
logger.error('Database connection failed', { error: err.message, stack: err.stack });
总结
构建Node.js微服务架构需要考虑多个方面:
- 服务通信:选择合适的通信方式(HTTP、gRPC、消息队列)
- 服务发现:实现服务的自动注册和发现
- 容错处理:使用断路器、重试、超时等机制
- 配置管理:集中管理配置,支持动态更新
- 监控日志:完善的监控和日志系统
微服务架构虽然带来了灵活性和可扩展性,但也增加了系统的复杂性。在设计时需要权衡利弊,选择适合业务场景的架构方案。