hbase increment代码

hbase increase    

 

increase代码

1.将数据封装为increment对象

2.从increment对象中封装get

3.封装新kv

4.对HRegion下的Storm做upsert或add操作

5.查看是否需要flush并添加队列

6.返回kvs

 

HRegion代码,如下

 

  /*** Perform one or more increment operations on a row.* @param increment* @return new keyvalues after increment* @throws IOException*/public Result increment(Increment increment, long nonceGroup, long nonce)throws IOException {byte [] row = increment.getRow();checkRow(row, "increment");TimeRange tr = increment.getTimeRange();boolean flush = false;Durability durability = getEffectiveDurability(increment.getDurability());boolean writeToWAL = durability != Durability.SKIP_WAL;WALEdit walEdits = null;List allKVs = new ArrayList(increment.size());Map> tempMemstore = new HashMap>();long size = 0;long txid = 0;checkReadOnly();checkResources();// Lock rowstartRegionOperation(Operation.INCREMENT);this.writeRequestsCount.increment();WriteEntry w = null;try {RowLock rowLock = getRowLock(row);try {lock(this.updatesLock.readLock());// wait for all prior MVCC transactions to finish - while we hold the row lock// (so that we are guaranteed to see the latest state)mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());// now start my own transactionw = mvcc.beginMemstoreInsert();try {long now = EnvironmentEdgeManager.currentTimeMillis();// Process each familyfor (Map.Entry> family:increment.getFamilyCellMap().entrySet()) {Store store = stores.get(family.getKey());List kvs = new ArrayList(family.getValue().size());// Sort the cells so that they match the order that they// appear in the Get results. Otherwise, we won't be able to// find the existing values if the cells are not specified// in order by the client since cells are in an array list.Collections.sort(family.getValue(), store.getComparator());// Get previous values for all columns in this family// 从increment封装get请求Get get = new Get(row);for (Cell cell: family.getValue()) {KeyValue kv = KeyValueUtil.ensureKeyValue(cell);get.addColumn(family.getKey(), kv.getQualifier());}get.setTimeRange(tr.getMin(), tr.getMax());List results = get(get, false);//获得此increase的row// Iterate the input columns and update existing values if they were// found, otherwise add new column initialized to the increment amountint idx = 0;for (Cell kv: family.getValue()) {long amount = Bytes.toLong(CellUtil.cloneValue(kv));//获得当前value值boolean noWriteBack = (amount == 0);Cell c = null;if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {c = results.get(idx);if(c.getValueLength() == Bytes.SIZEOF_LONG) {amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);} else {// throw DoNotRetryIOException instead of IllegalArgumentExceptionthrow new org.apache.hadoop.hbase.DoNotRetryIOException("Attempted to increment field that isn't 64 bits wide");}idx++;}// Append new incremented KeyValue to listbyte[] q = CellUtil.cloneQualifier(kv);byte[] val = Bytes.toBytes(amount);int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();int incCellTagsLen = kv.getTagsLength();KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length);System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(),family.getKey().length);System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length);// copy in the valueSystem.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length);// copy tagsif (oldCellTagsLen > 0) {System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(),newKV.getTagsOffset(), oldCellTagsLen);}if (incCellTagsLen > 0) {System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(),newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);}newKV.setMvccVersion(w.getWriteNumber());// Give coprocessors a chance to update the new cellif (coprocessorHost != null) {newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV));}allKVs.add(newKV);if (!noWriteBack) {kvs.add(newKV);// Prepare WAL updatesif (writeToWAL) {if (walEdits == null) {walEdits = new WALEdit();}walEdits.add(newKV);}}}//store the kvs to the temporary memstore before writing HLogif (!kvs.isEmpty()) {tempMemstore.put(store, kvs);}}// Actually write to WAL nowif (walEdits != null && !walEdits.isEmpty()) {if (writeToWAL) {// Using default cluster id, as this can only happen in the orginating// cluster. A slave cluster receives the final value (not the delta)// as a Put.txid = this.log.appendNoSync(this.getRegionInfo(),this.htableDescriptor.getTableName(), walEdits, new ArrayList(),EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,true, nonceGroup, nonce);} else {recordMutationWithoutWal(increment.getFamilyCellMap());}}//Actually write to Memstore nowif (!tempMemstore.isEmpty()) {//更新hbase kvfor (Map.Entry> entry : tempMemstore.entrySet()) {Store store = entry.getKey();if (store.getFamily().getMaxVersions() == 1) {// upsert if VERSIONS for this CF == 1size += store.upsert(entry.getValue(), getSmallestReadPoint());} else {// otherwise keep older versions aroundfor (Cell cell : entry.getValue()) {KeyValue kv = KeyValueUtil.ensureKeyValue(cell);size += store.add(kv);}}}size = this.addAndGetGlobalMemstoreSize(size);flush = isFlushSize(size);}} finally {this.updatesLock.readLock().unlock();}} finally {rowLock.release();}if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) {// sync the transaction log outside the rowlocksyncOrDefer(txid, durability);}} finally {if (w != null) {mvcc.completeMemstoreInsert(w);}closeRegionOperation(Operation.INCREMENT);if (this.metricsRegion != null) {this.metricsRegion.updateIncrement();}}if (flush) {// Request a cache flush.  Do it outside update lock.requestFlush();}return Result.create(allKVs);}

 

在一些情况下,如increment压力过大时,会出现下列错误,startNonceOperation方法:

 

 regionserver.ServerNonceManager: Conflict detected by nonce

 

一个mutation里边有多个相同的nonce的操作,如increment,这样会产生此日志,影响相应速度

 

 

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部