这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,11 @@ typedef struct {
* pointers are worthwhile moving and which aren't */
int je_get_defrag_hint(void* ptr);

/* Defrag helper for generic allocations.
/* Defrag helper for generic allocations without freeing old pointer.
*
* returns NULL in case the allocation wasn't moved.
* when it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
void* activeDefragAlloc(void *ptr) {
* Note: The caller is responsible for freeing the old pointer if this function
* returns a non-NULL value. */
void* activeDefragAllocWithoutFree(void *ptr) {
size_t size;
void *newptr;
if(!je_get_defrag_hint(ptr)) {
Expand All @@ -159,19 +158,34 @@ void* activeDefragAlloc(void *ptr) {
size = zmalloc_usable_size(ptr);
newptr = zmalloc_no_tcache(size);
memcpy(newptr, ptr, size);
zfree_no_tcache(ptr);
server.stat_active_defrag_hits++;
return newptr;
}

void activeDefragFree(void *ptr) {
zfree_no_tcache(ptr);
}

/* Defrag helper for generic allocations.
*
* returns NULL in case the allocation wasn't moved.
* when it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
void* activeDefragAlloc(void *ptr) {
void *newptr = activeDefragAllocWithoutFree(ptr);
if (newptr)
activeDefragFree(ptr);
return newptr;
}

/* Raw memory allocation for defrag, avoid using tcache. */
void *activeDefragAllocRaw(size_t size) {
return zmalloc_no_tcache(size);
}

/* Raw memory free for defrag, avoid using tcache. */
void activeDefragFreeRaw(void *ptr) {
zfree_no_tcache(ptr);
activeDefragFree(ptr);
server.stat_active_defrag_hits++;
}

Expand Down Expand Up @@ -825,6 +839,7 @@ void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
PendingEntryContext *ctx = privdata;
streamNACK *nack = ri->data, *newnack;
nack->consumer = ctx->c; /* update nack pointer to consumer */
nack->cgroup_ref_node->value = ctx->cg; /* Update the value of cgroups_ref node to the consumer group. */
newnack = activeDefragAlloc(nack);
if (newnack) {
/* update consumer group pointer to the nack */
Expand Down Expand Up @@ -853,13 +868,15 @@ void* defragStreamConsumer(raxIterator *ri, void *privdata) {
}

void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) {
streamCG *cg = ri->data;
streamCG *newcg, *cg = ri->data;
UNUSED(privdata);
if ((newcg = activeDefragAlloc(cg)))
cg = newcg;
if (cg->consumers)
defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
if (cg->pel)
defragRadixTree(&cg->pel, 0, NULL, NULL);
return NULL;
return cg;
}

void defragStream(defragKeysCtx *ctx, kvobj *ob) {
Expand All @@ -879,7 +896,7 @@ void defragStream(defragKeysCtx *ctx, kvobj *ob) {
defragRadixTree(&s->rax, 1, NULL, NULL);

if (s->cgroups)
defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
defragRadixTree(&s->cgroups, 0, defragStreamConsumerGroup, NULL);
}

/* Defrag a module key. This is either done immediately or scheduled
Expand Down Expand Up @@ -1275,7 +1292,7 @@ static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) {
void *activeDefragHExpiresOB(void *ptr, void *privdata) {
redisDb *db = privdata;
dictEntryLink link, exlink = NULL;
kvobj *kvobj = ptr;
kvobj *newkv, *kvobj = ptr;
sds keystr = kvobjGetKey(kvobj);
unsigned int slot = calculateKeySlot(keystr);
serverAssert(kvobj->type == OBJ_HASH);
Expand All @@ -1289,15 +1306,16 @@ void *activeDefragHExpiresOB(void *ptr, void *privdata) {
serverAssert(exlink != NULL);
}

if ((kvobj = activeDefragAlloc(kvobj))) {
if ((newkv = activeDefragAllocWithoutFree(kvobj))) {
/* Update its reference in the DB keys. */
link = kvstoreDictFindLink(db->keys, slot, keystr, NULL);
serverAssert(link != NULL);
kvstoreDictSetAtLink(db->keys, slot, kvobj, &link, 0);
kvstoreDictSetAtLink(db->keys, slot, newkv, &link, 0);
if (expire != -1)
kvstoreDictSetAtLink(db->expires, slot, kvobj, &exlink, 0);
kvstoreDictSetAtLink(db->expires, slot, newkv, &exlink, 0);
activeDefragFree(kvobj);
}
return kvobj;
return newkv;
}

static doneStatus defragStageHExpires(void *ctx, monotime endtime) {
Expand Down
3 changes: 3 additions & 0 deletions src/ebuckets.c
Original file line number Diff line number Diff line change
Expand Up @@ -1844,6 +1844,7 @@ void ebDefragRaxBucket(EbucketsType *type, raxIterator *ri,
ebDefragFunctions *defragfns, void *privdata)
{
CommonSegHdr *currentSegHdr = ri->data;
CommonSegHdr *firstSegHdr = currentSegHdr;
eItem iter = ((FirstSegHdr*)currentSegHdr)->head;
ExpireMeta *mHead = type->getExpireMeta(iter);
ExpireMeta *prevSegLastItem = NULL; /* The last item of the previous segment */
Expand Down Expand Up @@ -1879,6 +1880,7 @@ void ebDefragRaxBucket(EbucketsType *type, raxIterator *ri,
if (currentSegHdr == ri->data) {
/* If it's the first segment, update the rax data pointer. */
raxSetData(ri->node, ri->data=newSegHdr);
firstSegHdr = newSegHdr;
} else {
/* For non-first segments, update the previous segment's next
* item to new pointer. */
Expand All @@ -1897,6 +1899,7 @@ void ebDefragRaxBucket(EbucketsType *type, raxIterator *ri,
}

NextSegHdr *nextSegHdr = mIter->next;
nextSegHdr->firstSeg = (FirstSegHdr *)firstSegHdr;
if (newSegHdr) {
/* Update next segment's prev to point to the defragmented segment. */
nextSegHdr->prevSeg = newSegHdr;
Expand Down
Loading