2019独角兽企业重金招聘Python工程师标准>>>
摘要
- 端口9876是写死的。在这个方法中 org.apache.rocketmq.namesrv.NamesrvStartup.createNamesrvController(String[])
- properties 文件可以在启动参数 -c 中传入,主要配置netty、nameser的属性。详见NettyServerConfig属性配置、NamesrvConfig属性配置
- 初始化netty相关的类,启动netty,注册注册以下事件 HandshakeHandler、NettyEncoder、NettyDecoder、IdleStateHandler、NettyConnectManageHandler、NettyServerHandler,其中NettyServerHandler是处理请求和响应的。
源码阅读
启动执行顺序
/**
* 可以通过启动参数 -c 来配置启动参数文件,然后把这些文件注入到 namesrvConfig、NettyServerConfig
*/
org.apache.rocketmq.namesrv.NamesrvStartup.createNamesrvController(String[]) 创建建 NamesrvController,读取properties配置
--- org.apache.rocketmq.namesrv.NamesrvStartup.start(NamesrvController) 启动NamesrvController
------org.apache.rocketmq.namesrv.NamesrvController.initialize()
---------org.apache.rocketmq.namesrv.kvconfig.KVConfigManager.load() 1、初始化参数 ${user.home}/namesrv/kvConfig.json
---------org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyRemotingServer(NettyServerConfig, ChannelEventListener) 启动netty
------java.lang.Runtime.addShutdownHook(Thread)
---------org.apache.rocketmq.namesrv.NamesrvController.shutdown() 关闭nameserver
------org.apache.rocketmq.namesrv.NamesrvController.start()
---------org.apache.rocketmq.remoting.netty.NettyRemotingServer.start() 启动 netty,pipline上注册以下事件 HandshakeHandler、NettyEncoder、NettyDecoder、IdleStateHandler、NettyConnectManageHandler、NettyServerHandler。除此之外,还起了一些异步线程检查一些broker状态之类的。
重要类、方法说明
NettyServerConfig属性配置
public class NettyServerConfig implements Cloneable {
//nameserver的netty侦听端口,实际上是9876,外面被重新赋值了
private int listenPort = 8888;//实际上是9876
//nameserver工作线程数
private int serverWorkerThreads = 8;
//nameserver 回调处理线程数
private int serverCallbackExecutorThreads = 0;
//nameserver 处理netty的selector事件的线程数
private int serverSelectorThreads = 3;
//
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;
//65535
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
//65535
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
}
NamesrvConfig属性配置
public class NamesrvConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//rocketmq.home.dir 示例 D:\zhongwangspace\rocketmq-master\distribution
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//${user.home}/namesrv/kvConfig.json 示例 C:\Users\fengpc\namesrv\kvConfig.json
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
//${user.home}/namesrv/namesrv.properties 示例 C:\Users\fengpc\namesrv\namesrv.properties
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
HandshakeHandler
org.apache.rocketmq.remoting.netty.NettyRemotingServer的内部类,源代码:
class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final TlsMode tlsMode;
private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
HandshakeHandler(TlsMode tlsMode) {
this.tlsMode = tlsMode;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// mark the current position so that we can peek the first byte to determine if the content is starting with
// TLS handshake
msg.markReaderIndex();
byte b = msg.getByte(0);
if (b == HANDSHAKE_MAGIC_CODE) {
switch (tlsMode) {
case DISABLED:
ctx.close();
log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
break;
case PERMISSIVE:
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline to establish SSL connection");
} else {
ctx.close();
log.error("Trying to establish a SSL connection but sslContext is null");
}
break;
default:
log.warn("Unknown TLS mode");
break;
}
} else if (tlsMode == TlsMode.ENFORCING) {
ctx.close();
log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
}
// reset the reader index so that handshake negotiation may proceed as normal.
msg.resetReaderIndex();
try {
// Remove this handler
ctx.pipeline().remove(this);
} catch (NoSuchElementException e) {
log.error("Error while removing HandshakeHandler", e);
}
// Hand over this message to the next .
ctx.fireChannelRead(msg.retain());
}
}
整理逻辑是,根据第一个字符是不是0,如果是0,则判断是不是TLS mode。
NettyEncoder
整体逻辑是对远程指令remotingCommand加一个消息header。remotingCommand 是封装request、response、header、消息的body等内容。 NettyEncoder 源码如下:
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
NettyDecoder
这里接收到消息,调用 RemotingCommand.decode 将消息内容解码,源码如下:
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
}
IdleStateHandler
NettyConnectManageHandler
这里主要处理netty连接相关事件的,源码如下:
/**
* 处理netty连接,
* channelRegistered 是把连接过来的地址注册起来
* channelUnregistered 取消连接注册
*/
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
super.channelInactive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
RemotingUtil.closeChannel(ctx.channel());
}
}
NettyServerHandler
这个是最重要的,处理netty的请求和响应的。源码如下:
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler
// 处理netty收到的消息,分为请求消息 和 响应消息
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
/** 处理发送过来的消息
* 分请求消息 和 响应消息
*/
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
/**
* 处理请求
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
/**
* 处理响应的
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}