为什么新浪微博总是系统繁忙刷新总是出现Rpc ERROR

用心创造滤镜
扫码下载App
汇聚2000万达人的兴趣社区下载即送20张免费照片冲印
扫码下载App
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!&&|&&
主要从事海量数据处理,搜索的工作,工作之余喜欢写点文章。总结经验。欢迎转载,但转载请说明出处。也可以关注我的新浪微博/jamvp,进行交流。
LOFTER精选
网易考拉推荐
用微信&&“扫一扫”
将文章分享到朋友圈。
用易信&&“扫一扫”
将文章分享到朋友圈。
//创建rpcServer。this.rpcServer = new RpcServer(this, name, getServices(),& & & /*HBaseRPCErrorHandler.class, OnlineRegions.class},*/& & & initialIsa, // BindAddress is IP we got for this server.& & & conf.getInt("hbase.regionserver.handler.count", 10),& & & conf.getInt("hbase.regionserver.metahandler.count", 10),& & & conf, HConstants.QOS_THRESHOLD);hbase.regionserver.handler.count 代表处理客户端请求的线传输。默认10个,如果你的CPU核数比较多,或者处理不过来,可以增大这个参数。提供并发处理能力。hbase.regionserver.metahandler.count 处理对meta表操作的请求。默认也是10个。RpcServer&构成函数如下,只列出关键的代码:public RpcServer(final Server serverInstance, final String name,& & & final List&BlockingServiceAndInterface& services,& & & final InetSocketAddress isa, int handlerCount, int priorityHandlerCount, Configuration conf,& & & int highPriorityLevel)& throws IOException {&&& & //接收请求队列的大小,即call的个数。即线程个数*10,& & this.maxQueueLength = this.conf.getInt("ipc.server.max.callqueue.length",& & & handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);& &///接收请求队列的缓存的call的内存最大值,默认是1024 * 1024 * 1024=1G的空间& &//hbase rs 会做流控,即如果客户端的请求在全局队列里的个数和大小有一个超过了上面两个变量。就会拒绝,告诉客户端服务端出现拥堵& & this.maxQueueSize =& & & this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);& & //读socket的线程数,rs会对建立连接的channel绑定到一个线程上,采用rr算法来选择readThread。& & //这个设计和netty的workGroup设计类似,都是在服务端开启多个读线程。负责将请求反序列化成Call,放到callQueue 队列中,& & //由 handler线程去取处理实现调用。reade线程的创建在Listener来创建的,有一个线程池来执行,reade是一个runnable。& & this.readThreads = conf.getInt("ipc.server.read.threadpool.size", 10);& & this.callQueue = new LinkedBlockingQueue&Call&(maxQueueLength);& & if (priorityHandlerCount & 0) {& & & this.priorityCallQueue = new LinkedBlockingQueue&Call&(maxQueueLength); // TODO hack on size& & } else {& & & this.priorityCallQueue =& & }& & this.highPriorityLevel = highPriorityL&&& & // Start the listener here and let it bind to the port&& &//监听客户端建立连接的线程。有Listener负责。相当于netty的boss线程。hbase是用一个线程来负责。& & listener = new Listener(name);& & this.port = listener.getAddress().getPort();& && & // Create the responder here&& & //region server 的写操作由Responder线程负责。即服务端逻辑执行完了,把结果写给客户端。& & responder = new Responder();&&& }Listener,Reader,Handler,Response线程都准备好了,就查下命令让他们上岗工作。是在region server在向master注册成功后,region server自己的初始化工作完成后,会调用this.rpcServer.start();来启动上面的几个线程。一次RPC请求的处理逻辑是Listener---&Reader----&Handler-----&Response。下面一个一个来分析。Listener 线程listener的工作比较简单,就是收到client的connection时,1 通过RRoubin算法选择一个Reader。2 把该连接对应的SocketChannel的OP_READ事件注册到选择的Reader的selector上,为读做准备。3 创建一个Connection实例,并attach到OP_READ事件的SelectionKey上。这样在读该channel上的数据实,就能拿到该connection。Reader 线程 多个reader 基本上只是做了一个代理,在接收到有数据可读后,通过SelectionKey 拿到connection,通过connection来读数据。ConnectionConnection 建立连接最先读到的是client发过来的长度为6个字节的heade报文。内容是:'HBas' + VERSION + AUTH_CODE,VERSION 是rpc的版本,是字节0,AUTH_CODE是认证的字节码80,默认代表SIMPLE,即不认证。如果属性hbase.security.authentication的配置为‘kerberos’即用kerberos来做安全认证。但这个需要hdfs层面也做了kerberos安全,才行。数据处理,正常的请求是由processRequest方法来处理的,buf是IO线程Read调用connection的readAndProcess方法读到的字节。connection的processRequest方法代码如下:protected void processRequest(byte[] buf) throws IOException, InterruptedException {& & & long totalRequestSize = buf.& & & int offset = 0;& & & // Here we read in the header. We avoid having pb& & & // do its default 4k allocation for CodedInputStream. We force it to use backing array.& & & CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);& & & int headerSize = cis.readRawVarint32();& & & offset = cis.getTotalBytesRead();& & & RequestHeader header = RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();& & & offset += headerS& & & int id = header.getCallId();& & & if (LOG.isTraceEnabled()) {& & & & LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +& & & & & " totalRequestSize: " + totalRequestSize + " bytes");& & & }& & & //到这里都是pb序列化的协议。totalRequestSize 代表当前包的长度。callQueueSize.get代表当前请求queue的字节大小。& & & //如果超过了maxQueueSize,则需要告诉客户端服务端处理比较慢了。队列满了。这个就是高可用的基本原则把。快速失败,不& & & //需要长时间等待。需要注意的是,如果出现这样的日志,可以看CPU利用是否高,如果不高的话,可以增加handler线程的个数。& & &// 以提高吞吐量。& & & // Enforcing the call queue size, this triggers a retry in the client& & & // This is a bit late to be doing this check - we have already read in the total request.& & & if ((totalRequestSize + callQueueSize.get()) & maxQueueSize) {& & & & final Call callTooBig =& & & & & new Call(id, this.service, null, null, null, this,& & & & & & responder, totalRequestSize, null);& & & & ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();& & & & setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),& & & & & "Call queue is full, is ipc.server.max.callqueue.size too small?");& & & & responder.doRespond(callTooBig);& & & && & & }& & & MethodDescriptor md =& & & Message param =& & & CellScanner cellScanner =& & & try {& & & & if (header.hasRequestParam() && header.getRequestParam()) {& & & & & md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());& & & & & Builder builder = this.service.getRequestPrototype(md).newBuilderForType();& & & & & // To read the varint, I might as well be a CIS.& & & & & cis = CodedInputStream.newInstance(buf, offset, buf.length);& & & & & int paramSize = cis.readRawVarint32();& & & & & offset += cis.getTotalBytesRead();& & & & & if (builder != null) {& & & & & & param = builder.mergeFrom(buf, offset, paramSize).build();& & & & & }& & & & & offset += paramS& & & & }& & & & if (header.hasCellBlockMeta()) {& & & & & cellScanner = ipcUtil.createCellScanner(this.codec, pressionCodec,& & & & & & buf, offset, buf.length);& & & & }& & & } catch (Throwable t) {& & & & String msg = "Unable to read call parameter from client " + getHostAddress();& & & & LOG.warn(msg, t);& & & & // probably the hbase hadoop version does not match the running hadoop version& & & & if (t instanceof LinkageError) {& & & & & t = new DoNotRetryIOException(t);& & & & }& & & & // If the method is not present on the server, do not retry.& & & & if (t instanceof UnsupportedOperationException) {& & & & & t = new DoNotRetryIOException(t);& & & & }& & & & final Call readParamsFailedCall =& & & & & new Call(id, this.service, null, null, null, this,& & & & & & responder, totalRequestSize, null);& & & & ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();& & & & setupResponse(responseBuffer, readParamsFailedCall, t,& & & & & msg + "; " + t.getMessage());& & & & responder.doRespond(readParamsFailedCall);& & & && & & }& & &//上面基本上是pb解析自己。& & & Call call =& & & if (header.hasTraceInfo()) {& & & & call = new Call(id, this.service, md, param, cellScanner, this,& & & & & responder, totalRequestSize, new TraceInfo(header.getTraceInfo().getTraceId(),& & & & & & header.getTraceInfo().getParentId()));& & & } else {& & & & call = new Call(id, this.service, md, param, cellScanner, this, responder,& & & & & totalRequestSize, null);& & & }& & & //根据解析的字节创建一个Call对象。这个说明,客户端在发送的时,也是封装为Call来发送的。& & & //更新全局调用队列queue的大小,callQueueSize是一个线程安全的记数器。& & & callQueueSize.add(totalRequestSize);& & & Pair&RequestHeader, Message& headerAndParam =& & & & new Pair&RequestHeader, Message&(header, param);& & & int level = getQosLevel(headerAndParam);& & & //这里是判断call的优先级,正常都是进入callQueue队列。如是优先级比较高的,比如client要处理meta表相关的rpc请求& & & //的优先级是高的,就进入了priorityCallQueue队列,友负责处理该队列的线程处理,不需要等待业务线程。& & & //还可以看到,hbase复制请求也是由专门的线程来负责的。& & & if (priorityCallQueue != null && level & highPriorityLevel) {& & & & priorityCallQueue.put(call);& & & } else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) {& & & & replicationQueue.put(call);& & & } else {& & & & //callQueue是阻塞队列,这里Reader线程涉及到锁的竞争。相比netty的线程模型,一个work线程一个队列,避免了锁的竞争。& & & & //但也有个不好的地方,就是如果一个请求出现阻塞,那后面的任务都被阻塞,即使其他线程闲得无事,也只能干巴巴的看& & & & //这里最好是如果有线程空闲了,就检测兄弟队列是否处理不过来,可以伸手帮一吧。这样能最大化利用线程。
//说到这里,我们应该可以禁用掉HotSpot JVM 默认启用的偏向锁,防止Hbase RegionServer频繁的发生偏向锁的撤消。
//偏向锁的撤消会带来性能消耗,因为要挂起持有偏向锁的线程。可以添加参数 -XX:-UseBiasedLocking来禁用。& & & & callQueue.put(call); // maybe blocked here& & & }& & }到这里,connection的任务就完成,他已经把字节反序列化成call,根据不同的优先级请求,放到不同的队列,等待线程来处理。下面Handler就登场了。Handler 线程,多个Handler是个线程,那业务逻辑就在run方法。代码如下:@Override& & public void run() {& & & (getName() + ": starting");& & & status.setStatus("starting");& & & SERVER.set(RpcServer.this);& & & while (running) {& & & & try {& & & & & status.pause("Waiting for a call");& & & & & //从全局队列取出Call,这里是Handler线程直接的锁竞争。& & & & & Call call = myCallQueue.take(); // maybe blocked here& & & & & if (!call.connection.channel.isOpen()) {& & & & & & if (LOG.isDebugEnabled()) {& & & & & & & LOG.debug(Thread.currentThread().getName() + ": skipped " + call);& & & & & & }& & & & & && & & & & }& & & & & status.setStatus("Setting up call");& & & & & status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());& & & & & if (LOG.isDebugEnabled()) {& & & & & & UserGroupInformation remoteUser = call.connection.& & & & & & LOG.debug(call.toShortString() + " executing as " +& & & & & & & ((remoteUser == null)? "NULL principal": remoteUser.getUserName()));& & & & & }& & & & & Throwable errorThrowable =& & & & & String error =& & & & & Pair&Message, CellScanner& resultPair =& & & & & CurCall.set(call);& & & & & TraceScope traceScope =& & & & & try {& & & & & & if (!started) {& & & & & & & throw new ServerNotRunningYetException("Server is not running yet");& & & & & & }& & & & & & if (call.tinfo != null) {& & & & & & & traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);& & & & & & }& & & & & & // make the call& & & & & RequestContext.set(userProvider.create(call.connection.user), getRemoteIp(),& & & & & & call.connection.service);& & & & & // make the call& & & & & //真正的RegionServer端的方法调用逻辑在这里。调用都是通过pb的代理来执行的。&& & & & & //这里有个需要注意的是:hbase.ipc.warn.response.time ,默认是10000,10秒,如果方法的执行时间大于该值,& & & & & //RS会打印日志,处理得太慢。性能可能出问题了。& & & & & resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp,& & & & & & & status);& & & & & } catch (Throwable e) {& & & & & & LOG.debug(getName() + ": " + call.toShortString(), e);& & & & & & errorThrowable =& & & & & & error = StringUtils.stringifyException(e);& & & & & } finally {& & & & & & if (traceScope != null) {& & & & & & & traceScope.close();& & & & & & }& & & & & & // Must always clear the request context to avoid leaking& & & & & & // credentials between requests.& & & & & & RequestContext.clear();& & & & & }& & & & & CurCall.set(null);& & & & & //执行完方法后,需要更新调用队列的大小.& & & & & callQueueSize.add(call.getSize() * -1);& & & & & // Set the response for undelayed calls and delayed calls with& & & & & // undelayed responses.& & & & & if (!call.isDelayed() || !call.isReturnValueDelayed()) {& & & & & & Message param = resultPair != null? resultPair.getFirst():& & & & & & CellScanner cells = resultPair != null? resultPair.getSecond():& & & & & & call.setResponse(param, cells, errorThrowable, error);& & & & & }& & & & & &//1 把结果写给客户端。这里不是直接交给response线程来写,而是handler线程尝试先写。& & & & & // 2 如果没有写完,再在response的selector上注册写事件,并attach该call,到下次可写时,继续发送剩余的字节。& & & & & // 第二部分是response线程的任务,下面会讲到这部分。& & & & & call.sendResponseIfReady();& & & & & status.markComplete("Sent response");& & & & } catch (InterruptedException e) {& & & & & if (running) { // unexpected -- log it& & & & & & (getName() + ": caught: " + StringUtils.stringifyException(e));& & & & & }& & & & } catch (OutOfMemoryError e) {& & & & & if (errorHandler != null) {& & & & & & if (errorHandler.checkOOME(e)) {& & & & & & & (getName() + ": exiting on OutOfMemoryError");& & & & & & && & & & & & }& & & & & } else {& & & & & & // rethrow if no handler& & & & & && & & & & }& & & &} catch (ClosedChannelException cce) {& & & & & LOG.warn(getName() + ": caught a ClosedChannelException, " +& & & & & & "this means that the server was processing a " +& & & & & & "request but the client went away. The error message was: " +& & & & & & cce.getMessage());& & & & } catch (Exception e) {& & & & & LOG.warn(getName() + ": caught: " + StringUtils.stringifyException(e));& & & & }& & & }& & & (getName() + ": exiting");& & }Response 线程response负责所有connection的写,主要逻辑在doRunLoop方法里,代码如下:private void doRunLoop() {& & & long lastPurgeTime = 0; // last check for old calls.& & & while (running) {& & & & try {& & & & &//有注册写事件时,pending会加1,然后block住response线程,到注册成功后,pending会减1,& & & & &//并notify,释放response的锁。让response线程进入监听可写事件。& & & & & waitPending(); // If a channel is being registered, wait.& & & & &//purgeTimeout 的值默认是ipc.client.call.purge.timeout值的2倍,即2*60000,2分钟。& & & & & writeSelector.select(purgeTimeout);& & & & & Iterator&SelectionKey& iter = writeSelector.selectedKeys().iterator();& & & & & while (iter.hasNext()) {& & & & & & SelectionKey key = iter.next();& & & & & & iter.remove();& & & & & & try {& & & & & & & if (key.isValid() && key.isWritable()) {& & & & & & & & & //某个channel的写缓存区可写。后面分析。& & & & & & & & & doAsyncWrite(key);& & & & & & & }& & & & & & } catch (IOException e) {& & & & & & & (getName() + ": asyncWrite", e);& & & & & & }& & & & & }& & & & & //这里需要注意下,response线程有个清除机制,即在purgeTimeout的时间内,会检查所有注册的key上的call,& & & & & //如果call从注册上来时间起,经过purgeTimeout的时间还没有发送出去,则丢弃,同时close对应的connection& & & & & //这个挺野蛮的,应该是在这个时间内,client应该已经超时了,所以服务端再把这些发送到client端,已经没有意义,就做超时处理。& & & & & long now = System.currentTimeMillis();& & & & & if (now & lastPurgeTime + purgeTimeout) {& & & & & && & & & & }& & & & & //下面的代码都是处理purge的。& & & & & lastPurgeTime =& & & & & //& & & & & // If there were some calls that have not been sent out for a& & & & & // long time, discard them.& & & & & //& & & & & if (LOG.isDebugEnabled()) LOG.debug(getName() + ": checking for old call responses.");& & & & & ArrayList&Call&& & & & & // get the list of channels from list of keys.& & & & & synchronized (writeSelector.keys()) {& & & & & & calls = new ArrayList&Call&(writeSelector.keys().size());& & & & & & iter = writeSelector.keys().iterator();& & & & & & while (iter.hasNext()) {& & & & & & & SelectionKey key = iter.next();& & & & & & & Call call = (Call)key.attachment();& & & & & & & if (call != null && key.channel() == call.connection.channel) {& & & & & & & & calls.add(call);& & & & & & & }& & & & & & }& & & & & }& & & & & for(Call call : calls) {& & & & & & try {& & & & & & & doPurge(call, now);& & & & & & } catch (IOException e) {& & & & & & & LOG.warn(getName() + ": error in purging old calls " + e);& & & & & & }& & & & & }& & & & } catch (OutOfMemoryError e) {& & & & & if (errorHandler != null) {& & & & & & if (errorHandler.checkOOME(e)) {& & & & & & & (getName() + ": exiting on OutOfMemoryError");& & & & & & && & & & & & }& & & & & } else {& & & & & & //& & & & & & // we can run out of memory if we have too many threads& & & & & & // log the event and sleep for a minute and give& & & & & & // some thread(s) a chance to finish& & & & & & //& & & & & & LOG.warn(getName() + ": OutOfMemoryError in server select", e);& & & & & & try { Thread.sleep(60000); } catch (Exception ignored) {}& & & & & }& & & & } catch (Exception e) {& & & & & LOG.warn(getName() + ": exception in Responder " +& & & & & & & & & &StringUtils.stringifyException(e));& & & & }& & & }& & & (getName() + ": stopped");& & }response 写数据。private void doAsyncWrite(SelectionKey key) throws IOException {
& & & Call call = (Call)key.attachment();
& & & if (call == null) {
& & & if (key.channel() != call.connection.channel) {
& & & & throw new IOException("doAsyncWrite: bad channel");
& & //这个地方加锁非常关键,会个handler线程竞争responseQueue的锁,因为RS rpc server是先直接写,
& & & //所以handler先会获取到call.connection.responseQueue的锁,当然如果call不是同一个connection,则不会。
& & & //注意直接写不是指写当前的call,是获取到锁后,也是需要从responseQueue队列中,取出上次为写完的call,继续写。
& & & //写的关键逻辑在processResponse方法。如果processResponse返回true,代表responseQueue队列只有一个待写的call,
& & & //而且写成功了。因为写队列responseQueue没有可写的了,所以需要清除该key上的写事件。以免发生不必要的系统调用。& & & synchronized(call.connection.responseQueue) {
& & & & if (processResponse(call.connection.responseQueue, false)) {
& & & & & try {
& & & & & & key.interestOps(0);
& & & & & } catch (CancelledKeyException e) {
& & & & & & /* The Listener/reader might have closed the socket.
& & & & & & &* We don't explicitly cancel the key, so not sure if this will
& & & & & & &* ever fire.
& & & & & & &* This warning could be removed.
& & & & & & &*/
& & & & & & LOG.warn("Exception while changing ops : " + e);
& & & & & }
& & }上面分析了doAsyncWrite代码,下面分析processResponse方法。processResponse的代码如下:private boolean processResponse(final LinkedList&Call& responseQueue, boolean inHandler)
& & throws IOException {
& & & boolean error =
& & & boolean done = // there is more data for this channel.
& & & int numE
& & & Call call =
& & & try {
& & & & //noinspection SynchronizationOnLocalVariableOrMethodParameter
& & & & //这个地方重新对responseQueue加锁,有点不明白,进入processResponse之前,都已经对responseQueue加过锁了。& & & & synchronized (responseQueue) {
& & & & & //
& & & & & // If there are no items for this channel, then we are done
& & & & & //
计算当前写队列的大小。如果没有,就直接返回true。取消写事件。& & & & & numElements = responseQueue.size();
& & & & & if (numElements == 0) {
& & & & & & error =
& & & & & & // no more data for this channel.
& & & & & }
& & & & & //
& & & & & // Extract the first call
& & & & & //
这里的call是handler添加到responseQueue的。& & & & & call = responseQueue.removeFirst();
& & & & & SocketChannel channel = call.connection.
& & & & & //
& & & & & // Send as much data as we can in the non-blocking fashion
& & & & & //
numBytes 小于0,代表socket channel出错了。也要返回true。& & & & & long numBytes = channelWrite(channel, call.response);
& & & & & if (numBytes & 0) {
& & & & & &
& & & & & }
& & & & & &//到这里说明发送了numBytes 个字节,hasRemaining()返回true,说明limit还大于opeisition,说明缓存区还有字节没有写完,& & & & & &//返回false,说明opeisition已经等于limit,即所有自己已经发送出去。& & & & & if (!call.response.hasRemaining()) {
& & & & & & call.connection.decRpcCount();
& & & & & & //noinspection RedundantIfStatement
& & & & & & //numElements 等于1,说明是只有一个call,返回true。取消写事件。& & & & & & if (numElements == 1) { // last call fully processes.
& & & & & & & done = // no more data for this channel.
& & & & & & } else {
& & & & & & & //这里是如果写队列里有多个带写的call,虽然该call写完了,但缓存区还是可写的。
& & & & & & & //也需要epoll调用一次,让selector拿到可写事件。继续来写。即有多少个call,
& & & & & & & //就会发生多少次epoll系统调用,个人觉得如果这里再继续写,直到写满,应该可以减少epoll系统调用的次数。
& & & & & & & //返回false是,不取消写事件。& & & & & & & done = // more calls pending to be sent.
& & & & & & }
& & & & & & if (LOG.isDebugEnabled()) {
& & & & & & & LOG.debug(getName() + ": callId: " + call.id + " wrote " + numBytes + " bytes.");
& & & & & & }
& & & & & } else {
& & & & & & //
& & & & & & // If we were unable to write the entire response out, then
& & & & & & // insert in Selector queue.
& & & & & & //
到这里是,当前call没有写完,重新把call添加到写队列responseQueue。& & & & & & // 这里可以说明缓存区已经写满了。& & & & & & call.connection.responseQueue.addFirst(call);
& & & & & //inHandler是false,& & & & & & if (inHandler) {
& & & & & & & // set the serve time when the response has to be sent later
& & & & & & & call.timestamp = System.currentTimeMillis();
& & & & & & & if (enqueueInSelector(call))
& & & & & & & & done =
& & & & & & }
& & & & & & if (LOG.isDebugEnabled()) {
& & & & & & & LOG.debug(getName() + call.toShortString() + " partially sent, wrote " +
& & & & & & & & numBytes + " bytes.");
& & & & & & }
& & & & & }
& & & & & error = // everything went off well
& & & } finally {
& & & & if (error && call != null) {
& & & & & LOG.warn(getName() + call.toShortString() + ": output error");
& & & & & done = // error. no more data for this channel.
& & & & & closeConnection(call.connection);
& & }上面分析了Response线程写call的情况。我们前面还说到,handler线程自己写的情况。是调用response线程的doRespond方法实现的。参数call就是handler执行完方法调用后的返回结果的封装。void doRespond(Call call) throws IOException {
& & & // set the serve time when the response has to be sent later
& & & call.timestamp = System.currentTimeMillis();
& & & boolean doRegister =
& & & //加锁保证写的call的顺序。构建链表的正确性。responseQueue是个链表
& & & //上面我们说了response线程也会获取responseQueue的锁。& & & synchronized (call.connection.responseQueue) {
& & & & //追加call到链表。等写完后会remove掉。如果没有写完call里字节,则需要继续注册写事件,等下次可写时,再写。
& & & & call.connection.responseQueue.addLast(call);
& & & & //如果responseQueue里就一个,那可以尝试先写。而不用去Selector上注册写事件。多一次系统调用。
& & & & if (call.connection.responseQueue.size() == 1) {
& & & & & //是否需要注册写事件
& & & & & doRegister = !processResponse(call.connection.responseQueue, false);
& & & //这里如果connection的写队列responseQueue有大于1个call待写,或者是processResponse返回false,& & & //前面我们分析了processResponse方法。在responseQueue写完一个call,且responseQueue队列的个数大于1,返回false& & & if (doRegister) {
& & & & enqueueInSelector(call);
& & }到这里hbase rpc server端基本分析完成。看完这偏文章,应该了解了hbase rpc server端的线程模型,实现原理。这里是基于0.96的版本,和0.94的版本查不多,除了序列化换成了google的pb。有些东西,你看了不写,总是会忘记,在别人问你的时候,我们总能找到我们做写的笔记。不然人家会认为你说谎。为了广大技术爱好者学习netty,在这里帮新浪微博@nettying宣传下他即将出版的新书 &netty权威指南&@nettying兄在华为NIO实践多年,这本书是他的技术和经验的一个结晶。如果你对JAVA NIO感兴趣,那感激关注他吧。读了这本书,你的技术定会有一个质的飞跃。51 节日快乐。
驰晨推荐阅读:
& 16:30:57
阅读(2329)|
用微信&&“扫一扫”
将文章分享到朋友圈。
用易信&&“扫一扫”
将文章分享到朋友圈。
历史上的今天
loftPermalink:'',
id:'fks_',
blogTitle:'Hbase rpc server 源码分析',
blogAbstract:'本文原创,转载请说明出处:http://ronxin999./blog/static//Hbase 各个组件的通信是通过PRC,并且自己基于NIO实现了一套PPC的实现。每个RS上都会开启socket监听客户端的请求。相当于服务端的实现,主要是由org.apache.hadoop.hbase.ipc.RpcServer 这个类来完成。在RS启动的时候会创建好。',
blogTag:'hbase,rpc,regionserver',
blogUrl:'blog/static/',
isPublished:1,
istop:false,
modifyTime:9,
publishTime:9,
permalink:'blog/static/',
commentCount:0,
mainCommentCount:0,
recommendCount:0,
bsrk:-100,
publisherId:0,
recomBlogHome:false,
currentRecomBlog:false,
attachmentsFileIds:[],
groupInfo:{},
friendstatus:'none',
followstatus:'unFollow',
pubSucc:'',
visitorProvince:'',
visitorCity:'',
visitorNewUser:false,
postAddInfo:{},
mset:'000',
remindgoodnightblog:false,
isBlackVisitor:false,
isShowYodaoAd:false,
hostIntro:'主要从事海量数据处理,搜索的工作,工作之余喜欢写点文章。总结经验。欢迎转载,但转载请说明出处。也可以关注我的新浪微博/jamvp,进行交流。',
hmcon:'1',
selfRecomBlogCount:'0',
lofter_single:''
{list a as x}
{if x.moveFrom=='wap'}
{elseif x.moveFrom=='iphone'}
{elseif x.moveFrom=='android'}
{elseif x.moveFrom=='mobile'}
${a.selfIntro|escape}{if great260}${suplement}{/if}
{list a as x}
推荐过这篇日志的人:
{list a as x}
{if !!b&&b.length>0}
他们还推荐了:
{list b as y}
转载记录:
{list d as x}
{list a as x}
{list a as x}
{list a as x}
{list a as x}
{if x_index>4}{break}{/if}
${fn2(x.publishTime,'yyyy-MM-dd HH:mm:ss')}
{list a as x}
{if !!(blogDetail.preBlogPermalink)}
{if !!(blogDetail.nextBlogPermalink)}
{list a as x}
{if defined('newslist')&&newslist.length>0}
{list newslist as x}
{if x_index>7}{break}{/if}
{list a as x}
{var first_option =}
{list x.voteDetailList as voteToOption}
{if voteToOption==1}
{if first_option==false},{/if}&&“${b[voteToOption_index]}”&&
{if (x.role!="-1") },“我是${c[x.role]}”&&{/if}
&&&&&&&&${fn1(x.voteTime)}
{if x.userName==''}{/if}
网易公司版权所有&&
{list x.l as y}
{if defined('wl')}
{list wl as x}{/list}

我要回帖

更多关于 新浪微博 error 21338 的文章

 

随机推荐