Node.js微服务架构设计与实现

编程技术

Node.js微服务架构设计与实现

微服务架构是现代应用开发的主流模式。本文将介绍如何使用Node.js构建可扩展、高可用的微服务系统。

微服务架构概述

架构特点

  • 服务独立性:每个服务独立开发、部署、扩展
  • 技术多样性:不同服务可以使用不同技术栈
  • 故障隔离:单个服务故障不影响整体系统
  • 弹性扩展:根据负载独立扩展服务

服务拆分原则

按业务领域拆分:
├── 用户服务 (User Service)
├── 订单服务 (Order Service)
├── 支付服务 (Payment Service)
├── 库存服务 (Inventory Service)
└── 通知服务 (Notification Service)

服务通信

1. HTTP/REST API

// 用户服务
import express from 'express';
import axios from 'axios';

const app = express();
app.use(express.json());

// 服务发现配置
const SERVICES = {
  ORDER_SERVICE: process.env.ORDER_SERVICE_URL || 'http://localhost:3002',
  PAYMENT_SERVICE: process.env.PAYMENT_SERVICE_URL || 'http://localhost:3003'
};

// 创建用户
app.post('/users', async (req, res) => {
  try {
    const user = await createUser(req.body);
    res.status(201).json(user);
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

// 获取用户订单(调用订单服务)
app.get('/users/:userId/orders', async (req, res) => {
  try {
    const { userId } = req.params;
    
    // 验证用户存在
    const user = await getUser(userId);
    if (!user) {
      return res.status(404).json({ error: 'User not found' });
    }

    // 调用订单服务
    const response = await axios.get(
      `${SERVICES.ORDER_SERVICE}/orders?userId=${userId}`,
      {
        headers: {
          'X-Request-ID': req.headers['x-request-id'] || generateRequestId(),
          'X-User-ID': userId
        },
        timeout: 5000
      }
    );

    res.json(response.data);
  } catch (error) {
    if (axios.isAxiosError(error)) {
      if (error.code === 'ECONNABORTED') {
        return res.status(504).json({ error: 'Order service timeout' });
      }
      if (error.response) {
        return res.status(error.response.status).json(error.response.data);
      }
    }
    res.status(500).json({ error: error.message });
  }
});

// 断路器模式
class CircuitBreaker {
  private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
  private failureCount = 0;
  private failureThreshold = 5;
  private timeout = 60000;
  private nextAttempt = Date.now();

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === 'OPEN') {
      if (Date.now() < this.nextAttempt) {
        throw new Error('Circuit breaker is OPEN');
      }
      this.state = 'HALF_OPEN';
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess() {
    this.failureCount = 0;
    this.state = 'CLOSED';
  }

  private onFailure() {
    this.failureCount++;
    if (this.failureCount >= this.failureThreshold) {
      this.state = 'OPEN';
      this.nextAttempt = Date.now() + this.timeout;
    }
  }
}

const orderServiceBreaker = new CircuitBreaker();

// 使用断路器
app.get('/users/:userId/orders-safe', async (req, res) => {
  try {
    const orders = await orderServiceBreaker.execute(async () => {
      const response = await axios.get(
        `${SERVICES.ORDER_SERVICE}/orders?userId=${req.params.userId}`,
        { timeout: 5000 }
      );
      return response.data;
    });
    res.json(orders);
  } catch (error) {
    res.status(503).json({ error: 'Service temporarily unavailable' });
  }
});

2. gRPC通信

// user.proto
syntax = "proto3";

package user;

service UserService {
  rpc GetUser (GetUserRequest) returns (User);
  rpc CreateUser (CreateUserRequest) returns (User);
  rpc UpdateUser (UpdateUserRequest) returns (User);
  rpc DeleteUser (DeleteUserRequest) returns (DeleteUserResponse);
}

message GetUserRequest {
  string id = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
  string password = 3;
}

message UpdateUserRequest {
  string id = 1;
  string name = 2;
  string email = 3;
}

message DeleteUserRequest {
  string id = 1;
}

message User {
  string id = 1;
  string name = 2;
  string email = 3;
  string created_at = 4;
  string updated_at = 5;
}

message DeleteUserResponse {
  bool success = 1;
  string message = 2;
}
// gRPC服务端
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';

const PROTO_PATH = './user.proto';

const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});

const userProto = grpc.loadPackageDefinition(packageDefinition).user;

const users = new Map();

const userService = {
  getUser: (call, callback) => {
    const user = users.get(call.request.id);
    if (user) {
      callback(null, user);
    } else {
      callback({
        code: grpc.status.NOT_FOUND,
        message: 'User not found'
      });
    }
  },

  createUser: (call, callback) => {
    const id = generateId();
    const user = {
      id,
      ...call.request,
      created_at: new Date().toISOString(),
      updated_at: new Date().toISOString()
    };
    users.set(id, user);
    callback(null, user);
  },

  updateUser: (call, callback) => {
    const user = users.get(call.request.id);
    if (!user) {
      callback({
        code: grpc.status.NOT_FOUND,
        message: 'User not found'
      });
      return;
    }
    const updatedUser = {
      ...user,
      ...call.request,
      updated_at: new Date().toISOString()
    };
    users.set(call.request.id, updatedUser);
    callback(null, updatedUser);
  },

  deleteUser: (call, callback) => {
    const exists = users.has(call.request.id);
    if (exists) {
      users.delete(call.request.id);
      callback(null, { success: true, message: 'User deleted' });
    } else {
      callback({
        code: grpc.status.NOT_FOUND,
        message: 'User not found'
      });
    }
  }
};

function main() {
  const server = new grpc.Server();
  server.addService(userProto.UserService.service, userService);
  server.bindAsync(
    '0.0.0.0:50051',
    grpc.ServerCredentials.createInsecure(),
    (err, port) => {
      if (err) {
        console.error(err);
        return;
      }
      console.log(`gRPC server running at port ${port}`);
      server.start();
    }
  );
}

main();

3. 消息队列

// 使用RabbitMQ
import amqp from 'amqplib';

class MessageQueue {
  private connection: amqp.Connection;
  private channel: amqp.Channel;

  async connect(url: string) {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();
  }

  async publish(exchange: string, routingKey: string, message: any) {
    await this.channel.assertExchange(exchange, 'topic', { durable: true });
    this.channel.publish(
      exchange,
      routingKey,
      Buffer.from(JSON.stringify(message)),
      { persistent: true }
    );
  }

  async consume(queue: string, handler: (msg: any) => Promise<void>) {
    await this.channel.assertQueue(queue, { durable: true });
    
    this.channel.consume(queue, async (msg) => {
      if (msg) {
        try {
          const content = JSON.parse(msg.content.toString());
          await handler(content);
          this.channel.ack(msg);
        } catch (error) {
          console.error('Message processing error:', error);
          this.channel.nack(msg, false, true);
        }
      }
    });
  }

  async close() {
    await this.channel.close();
    await this.connection.close();
  }
}

// 使用示例
const mq = new MessageQueue();

// 订单服务发布事件
async function publishOrderCreated(order: any) {
  await mq.publish('orders', 'order.created', {
    orderId: order.id,
    userId: order.userId,
    amount: order.amount,
    timestamp: new Date().toISOString()
  });
}

// 通知服务订阅事件
async function startNotificationService() {
  await mq.consume('notification-queue', async (message) => {
    if (message.event === 'order.created') {
      await sendOrderConfirmationEmail(message.userId, message.orderId);
    }
  });
}

服务发现

// 使用Consul进行服务发现
import Consul from 'consul';

class ServiceRegistry {
  private consul: Consul;
  private serviceId: string;

  constructor() {
    this.consul = new Consul({ host: 'localhost', port: 8500 });
  }

  async register(
    name: string,
    id: string,
    port: number,
    tags: string[] = []
  ) {
    this.serviceId = id;
    
    await this.consul.agent.service.register({
      name,
      id,
      port,
      tags,
      check: {
        http: `http://localhost:${port}/health`,
        interval: '30s',
        timeout: '10s'
      }
    });

    console.log(`Service ${name} registered with ID ${id}`);
  }

  async deregister() {
    if (this.serviceId) {
      await this.consul.agent.service.deregister(this.serviceId);
      console.log(`Service ${this.serviceId} deregistered`);
    }
  }

  async discover(serviceName: string): Promise<string[]> {
    const services = await this.consul.health.service(serviceName);
    return services.map((s: any) => 
      `http://${s.Service.Address}:${s.Service.Port}`
    );
  }
}

// 使用服务发现
const registry = new ServiceRegistry();

// 服务启动时注册
async function startService() {
  const port = process.env.PORT || 3001;
  
  await registry.register(
    'user-service',
    `user-service-${port}`,
    Number(port),
    ['node', 'microservice']
  );

  // 优雅关闭
  process.on('SIGINT', async () => {
    await registry.deregister();
    process.exit(0);
  });
}

// 客户端使用服务发现
async function getServiceUrl(serviceName: string): Promise<string> {
  const instances = await registry.discover(serviceName);
  if (instances.length === 0) {
    throw new Error(`No instances found for ${serviceName}`);
  }
  // 简单的负载均衡:随机选择
  return instances[Math.floor(Math.random() * instances.length)];
}

配置管理

// 使用etcd进行配置管理
import { Etcd3 } from 'etcd3';

class ConfigManager {
  private client: Etcd3;
  private cache: Map<string, any> = new Map();

  constructor(endpoints: string[]) {
    this.client = new Etcd3({ endpoints });
  }

  async get(key: string, defaultValue?: any): Promise<any> {
    // 先查缓存
    if (this.cache.has(key)) {
      return this.cache.get(key);
    }

    // 查etcd
    const value = await this.client.get(key).json();
    if (value !== null) {
      this.cache.set(key, value);
      return value;
    }

    return defaultValue;
  }

  async set(key: string, value: any): Promise<void> {
    await this.client.put(key).value(JSON.stringify(value));
    this.cache.set(key, value);
  }

  watch(key: string, callback: (value: any) => void): void {
    const watcher = this.client.watch().key(key).create();
    watcher.on('put', (res) => {
      const value = JSON.parse(res.value.toString());
      this.cache.set(key, value);
      callback(value);
    });
  }
}

// 使用示例
const config = new ConfigManager(['http://localhost:2379']);

// 获取配置
const dbConfig = await config.get('database.config', {
  host: 'localhost',
  port: 5432,
  database: 'myapp'
});

// 监听配置变化
config.watch('feature.flags', (flags) => {
  console.log('Feature flags updated:', flags);
  updateFeatureFlags(flags);
});

监控和日志

// 使用Prometheus进行监控
import promClient from 'prom-client';

// 创建指标
const httpRequestDuration = new promClient.Histogram({
  name: 'http_request_duration_seconds',
  help: 'Duration of HTTP requests in seconds',
  labelNames: ['method', 'route', 'status_code'],
  buckets: [0.1, 0.5, 1, 2, 5]
});

const httpRequestsTotal = new promClient.Counter({
  name: 'http_requests_total',
  help: 'Total number of HTTP requests',
  labelNames: ['method', 'route', 'status_code']
});

const activeConnections = new promClient.Gauge({
  name: 'active_connections',
  help: 'Number of active connections'
});

// 注册指标
promClient.register.registerMetric(httpRequestDuration);
promClient.register.registerMetric(httpRequestsTotal);
promClient.register.registerMetric(activeConnections);

// 中间件
function metricsMiddleware(req, res, next) {
  const start = Date.now();
  
  res.on('finish', () => {
    const duration = (Date.now() - start) / 1000;
    const route = req.route ? req.route.path : req.path;
    
    httpRequestDuration.observe(
      { method: req.method, route, status_code: res.statusCode },
      duration
    );
    
    httpRequestsTotal.inc({
      method: req.method,
      route,
      status_code: res.statusCode
    });
  });
  
  next();
}

// 指标端点
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', promClient.register.contentType);
  res.end(await promClient.register.metrics());
});

// 结构化日志
import winston from 'winston';

const logger = winston.createLogger({
  level: 'info',
  format: winston.format.combine(
    winston.format.timestamp(),
    winston.format.json()
  ),
  defaultMeta: { service: 'user-service' },
  transports: [
    new winston.transports.Console(),
    new winston.transports.File({ filename: 'error.log', level: 'error' }),
    new winston.transports.File({ filename: 'combined.log' })
  ]
});

// 使用日志
logger.info('User created', { userId: '123', email: 'user@example.com' });
logger.error('Database connection failed', { error: err.message, stack: err.stack });

总结

构建Node.js微服务架构需要考虑多个方面:

  1. 服务通信:选择合适的通信方式(HTTP、gRPC、消息队列)
  2. 服务发现:实现服务的自动注册和发现
  3. 容错处理:使用断路器、重试、超时等机制
  4. 配置管理:集中管理配置,支持动态更新
  5. 监控日志:完善的监控和日志系统

微服务架构虽然带来了灵活性和可扩展性,但也增加了系统的复杂性。在设计时需要权衡利弊,选择适合业务场景的架构方案。