Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
#include <stdatomic.h>
#include "benchmark.h"

DlschInd dlsch_ind __attribute__((section(".data")));
UeStateRpt ue_status_rpt_content __attribute__((section(".data")));

static inline size_t memdiff32(const void *a, const void *b, size_t len_bytes) {
const uint8_t *p = (const uint8_t *)a;
const uint8_t *q = (const uint8_t *)b;
Expand Down Expand Up @@ -223,82 +226,134 @@ int __attribute__((noinline)) pdcp_receive_pkg(const unsigned int core_id, volat
*/
#define PACKET_SIZE (PAGE_SIZE - sizeof(Node))

/* Consumer behavior (runs on core 0) */
static void consumer(const unsigned int core_id) {
while (1) {
Node *node = list_pop_front(&tosend_llist_lock_2, &rlc_ctx.list);
if (node != 0) {
// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("Consumer (core %u): processing node %p, data_size = %zu, data_src = 0x%x, data_tgt = 0x%x, @mcycle = %d\n",
// core_id, (void *)node, node->data_size, node->data, node->tgt, benchmark_get_cycle());
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);

// delay(100); /* Simulate processing delay */

uint32_t timer_mv_0, timer_mv_1;
// timer_mv_0 = benchmark_get_cycle();
// vector_memcpy32_m4_opt(node->tgt, node->data, node->data_size);
// vector_memcpy32_m8_opt(node->tgt, node->data, node->data_size);
// scalar_memcpy32_32bit_unrolled(node->tgt, node->data, node->data_size);
// vector_memcpy32_m8_m4_general_opt(node->tgt, node->data, node->data_size);
// vector_memcpy32_1360B_opt(node->tgt, node->data);
vector_memcpy32_1360B_opt_with_header(node->tgt, node->data, rlc_ctx.vtNext);
// timer_mv_1 = benchmark_get_cycle();

// Update the RLC struct variables
// atomic_fetch_add_explicit(&rlc_ctx.pduWithoutPoll, 1, memory_order_relaxed);
// atomic_fetch_add_explicit(&rlc_ctx.byteWithoutPoll, node->data_size, memory_order_relaxed);
rlc_ctx.pduWithoutPoll += 1;
rlc_ctx.byteWithoutPoll += node->data_size;
// Increment the next available RLC sequence number
rlc_ctx.vtNext += 1;
// atomic_fetch_add_explicit(&rlc_ctx.vtNext, 1, memory_order_relaxed);


// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("Consumer (core %u): move node %p from data_src = 0x%x to data_tgt = 0x%x, data_size = %zu, cyc = %d, bw = %dB/1000cyc\n",
// core_id, (void *)node, node->data, node->tgt, node->data_size,
// (timer_mv_1 - timer_mv_0),
// (node->data_size * 1000 / (timer_mv_1 - timer_mv_0)));
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);

// Add the node to the sent list
list_push_back(&sent_llist_lock_2, &rlc_ctx.sent_list, node);
#define PRODUCER_CORE_NUM 2
#define CONSUMER_CORE_NUM 2
#define CPU_FREQENCY 2200000
#define OUTPUT_DATARATE 7000000
#define INPUT_DATARATE 7000000

void ue_status_rpt(const unsigned int core_id)
{
// Simulate receiving ACK from UE after certain sent pkgs, and we assume the ACK_SN is rlc_ctx.vtNextAck+2
if (rlc_ctx.sent_list.sduNum >= 2) {
char head = ue_status_rpt_content.stateRpt[0];
char head1 = ue_status_rpt_content.stateRpt[1];
char head2 = ue_status_rpt_content.stateRpt[2];
uint32_t vtNextAck = atomic_load_explicit(&rlc_ctx.vtNextAck, memory_order_relaxed);
int ACK_SN = vtNextAck + 2; // Assume each time ack 2 sent pkgs
// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("[core %u][consumer] pollPdu=%d, pollByte=%d, sent_list.sduNum=%d, sent_list.sduBytes=%d\n",
// core_id, rlc_ctx.pollPdu, rlc_ctx.pollByte,
// rlc_ctx.sent_list.sduNum, rlc_ctx.sent_list.sduBytes);
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);

// Simulate receiving ACK from UE after certain sent pkgs, and we assume the ACK_SN is rlc_ctx.vtNextAck+2
if (rlc_ctx.sent_list.sduNum >= 10000) {
uint32_t vtNextAck = atomic_load_explicit(&rlc_ctx.vtNextAck, memory_order_relaxed);
int ACK_SN = vtNextAck + 2; // Assume each time ack 2 sent pkgs
for (int i = vtNextAck; i < ACK_SN; i++) {
char ack = ue_status_rpt_content.stateRpt[16 + ACK_SN - i];
Node *sent_node = list_pop_front(&sent_llist_lock_2, &rlc_ctx.sent_list);
if (sent_node != NULL) {
// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("[core %u][consumer] pollPdu=%d, pollByte=%d, sent_list.sduNum=%d, sent_list.sduBytes=%d\n",
// core_id, rlc_ctx.pollPdu, rlc_ctx.pollByte,
// rlc_ctx.sent_list.sduNum, rlc_ctx.sent_list.sduBytes);
// DEBUG_PRINTF("[core %u][consumer] pop sent_list, ACK_SN=%d, SN=%d, sent node %p, data_size=%zu\n",
// core_id, ACK_SN, i, (void *)sent_node, sent_node->data_size);
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);

for (int i = vtNextAck; i < ACK_SN; i++) {
Node *sent_node = list_pop_front(&sent_llist_lock_2, &rlc_ctx.sent_list);
if (sent_node != NULL) {
// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("[core %u][consumer] pop sent_list, ACK_SN=%d, SN=%d, sent node %p, data_size=%zu\n",
// core_id, ACK_SN, i, (void *)sent_node, sent_node->data_size);
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);
} else {
DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
DEBUG_PRINTF("[core %u][consumer] ERROR: pop sent_list, ACK_SN=%d, SN=%d, but sent_node is NULL\n",
core_id, ACK_SN, i);
DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);
}
mm_free(sent_node); // Free the sent node memory
}
atomic_store_explicit(&rlc_ctx.vtNextAck, ACK_SN, memory_order_relaxed); // Update the next ACK sequence number
} else {
DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
DEBUG_PRINTF("[core %u][consumer] ERROR: pop sent_list, ACK_SN=%d, SN=%d, but sent_node is NULL\n",
core_id, ACK_SN, i);
DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);
}
} else {
if (atomic_load_explicit(&producer_done, memory_order_relaxed) == 1) {
// If producer is done and the list is empty, exit the loop
break;
}
// delay(10); /* Wait briefly if list is empty */
mm_free(sent_node); // Free the sent node memory
}
rlc_ctx.acksn = ACK_SN;
rlc_ctx.nackcount = 0;
rlc_ctx.parseindex++;
atomic_store_explicit(&rlc_ctx.vtNextAck, ACK_SN, memory_order_relaxed); // Update the next ACK sequence number
for (uint32_t i = 0; i < 16; i++) {
rlc_ctx.dlDelayInfo[i] = 300;
}
}
}

static void rlc_send_pkt(const unsigned int core_id, TestDataStru *testData)
{
Node *node = list_pop_front(&tosend_llist_lock_2, &rlc_ctx.list);
if (node != 0) {
// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("Consumer (core %u): processing node %p, data_size = %zu, data_src = 0x%x, data_tgt = 0x%x, @mcycle = %d\n",
// core_id, (void *)node, node->data_size, node->data, node->tgt, benchmark_get_cycle());
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);

// delay(100); /* Simulate processing delay */

uint32_t timer_mv_0, timer_mv_1;
// timer_mv_0 = benchmark_get_cycle();
// vector_memcpy32_m4_opt(node->tgt, node->data, node->data_size);
// vector_memcpy32_m8_opt(node->tgt, node->data, node->data_size);
// scalar_memcpy32_32bit_unrolled(node->tgt, node->data, node->data_size);
// vector_memcpy32_m8_m4_general_opt(node->tgt, node->data, node->data_size);
// vector_memcpy32_1360B_opt(node->tgt, node->data);
vector_memcpy32_1360B_opt_with_header(node->tgt, node->data, rlc_ctx.vtNext);
// timer_mv_1 = benchmark_get_cycle();

// Update the RLC struct variables
// atomic_fetch_add_explicit(&rlc_ctx.pduWithoutPoll, 1, memory_order_relaxed);
// atomic_fetch_add_explicit(&rlc_ctx.byteWithoutPoll, node->data_size, memory_order_relaxed);
rlc_ctx.pduWithoutPoll += 1;
rlc_ctx.byteWithoutPoll += node->data_size;
// Increment the next available RLC sequence number
rlc_ctx.vtNext += 1;
// atomic_fetch_add_explicit(&rlc_ctx.vtNext, 1, memory_order_relaxed);
rlc_ctx.sendPduNum +=1;
rlc_ctx.sendPduBytes += node->data_size;
atomic_fetch_add_explicit(&rlc_ctx.tbsize, (node->data_size + 10), memory_order_relaxed);
/* read one cacheline from node mem */
RcvPktHeader tmp = *(RcvPktHeader *)node->data;
/* write one cacheline to node mem */
*((RcvPktHeader *)node->data + 1) = tmp;
rlc_ctx.pdcpcount++;
atomic_fetch_add_explicit(&rlc_ctx.rlcthrp, node->data_size, memory_order_relaxed);
atomic_fetch_add_explicit(&rlc_ctx.dlPduNum, 1, memory_order_relaxed);
atomic_fetch_add_explicit(&rlc_ctx.sduBytes, (0 - node->data_size), memory_order_relaxed);
atomic_fetch_add_explicit(&rlc_ctx.sduNum, (-1), memory_order_relaxed);
testData->sduNum = rlc_ctx.sduNum;
testData->rlcDpbPduCnt = rlc_ctx.pdcpcount;
testData->sudBytes = rlc_ctx.sduBytes;
testData->totalPdlLen = rlc_ctx.tbsize;
/* write one cacheline data to node mem */
for (uint32_t i = 0; i < 16; i++) {
rlc_ctx.rlcOm[i] = 20;
}
// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("Consumer (core %u): move node %p from data_src = 0x%x to data_tgt = 0x%x, data_size = %zu, cyc = %d, bw = %dB/1000cyc\n",
// core_id, (void *)node, node->data, node->tgt, node->data_size,
// (timer_mv_1 - timer_mv_0),
// (node->data_size * 1000 / (timer_mv_1 - timer_mv_0)));
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);

// Add the node to the sent list
list_push_back(&sent_llist_lock_2, &rlc_ctx.sent_list, node);
} // else {
// if (atomic_load_explicit(&producer_done, memory_order_relaxed) == 1) {
// // If producer is done and the list is empty, exit the loop
// break;
// }
// // delay(10); /* Wait briefly if list is empty */
// }
}

/* Consumer behavior (runs on core 0) */
static void consumer(const unsigned int core_id) {
uint32_t total_cycle = PRODUCER_CORE_NUM * PDU_SIZE * CPU_FREQENCY / OUTPUT_DATARATE;
while (1) {
uint32_t start_timecycle = benchmark_get_cycle();
TestDataStru dfx = {0};
dfx.dlschInd = dlsch_ind;
rlc_send_pkt(core_id, &dfx);
uint32_t start_endcycle = benchmark_get_cycle();
/* calculate delay interval */
uint32_t interval = end_timecycle - start_timecycle;
rlc_ctx.pktdelay = interval;
uint32_t delayCycle = (total_cycle >= interval) ? (total_cycle - interval) : 0;
delay(delayCycle);
}
}

Expand All @@ -314,76 +369,113 @@ static void producer(const unsigned int core_id) {
// pdcp_src_data[NUM_SRC_SLOTS-1][PDU_SIZE-1],
// benchmark_get_cycle());
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);
rlc_ctx.firstSduPktRxCycle = benchmark_get_cycle();
int new_pdcp_pkg_ptr = pdcp_receive_pkg(core_id, &pdcp_pkd_ptr_lock);
while (new_pdcp_pkg_ptr >= 0) {
// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("Producer (core %u): pdcp_receive_pkg id = %d, user_id = %d, pkg_length = %d, src_addr = 0x%x, tgt_addr = 0x%x\n",
// core_id,
// new_pdcp_pkg_ptr,
// pdcp_pkgs[new_pdcp_pkg_ptr].user_id,
// pdcp_pkgs[new_pdcp_pkg_ptr].pkg_length,
// pdcp_pkgs[new_pdcp_pkg_ptr].src_addr,
// pdcp_pkgs[new_pdcp_pkg_ptr].tgt_addr);
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);

// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("Producer (core %u): pdcp_receive_pkg id = %d, user_id = %d, pkg_length = %d, src_addr = 0x%x, tgt_addr = 0x%x\n",
// core_id,
// new_pdcp_pkg_ptr,
// pdcp_pkgs[new_pdcp_pkg_ptr].user_id,
// pdcp_pkgs[new_pdcp_pkg_ptr].pkg_length,
// pdcp_pkgs[new_pdcp_pkg_ptr].src_addr,
// pdcp_pkgs[new_pdcp_pkg_ptr].tgt_addr);
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);

Node *node = (Node *)mm_alloc();
if (!node) {

DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
DEBUG_PRINTF("Producer (core %u): Out of memory\n", core_id);
DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);
Node *node = (Node *)mm_alloc();
if (!node) {

delay(200); /* Delay before retrying */
continue;
}
DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
DEBUG_PRINTF("Producer (core %u): Out of memory\n", core_id);
DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);

uint32_t timer_body_0, timer_body_1;
delay(200); /* Delay before retrying */
return;
}

// timer_body_0 = benchmark_get_cycle();
/* Initialize the node header */
node->lock = 0;
node->prev = 0;
node->next = 0;
/* Set the payload pointer immediately after the Node structure */
uint32_t timer_body_0, timer_body_1;

// timer_body_0 = benchmark_get_cycle();
/* Initialize the node header */
node->lock = 0;
node->prev = 0;
node->next = 0;
/* Set the payload pointer immediately after the Node structure */
if (new_pdcp_pkg_ptr >= 0) {
node->data = (void *)((uint8_t *)(pdcp_pkgs[new_pdcp_pkg_ptr].src_addr));
node->tgt = (void *)((uint8_t *)(pdcp_pkgs[new_pdcp_pkg_ptr].tgt_addr));
node->data_size = pdcp_pkgs[new_pdcp_pkg_ptr].pkg_length;
// timer_body_1 = benchmark_get_cycle();
/* read one cacheline from node mem */
RcvPktHeader tmp = *(RcvPktHeader *)node->data;
unsigned int pingflag = rlc_ctx.pingFlag;
atomic_store_explicit(&rlc_ctx.pingFlag, pingflag, memory_order_relaxed);
atomic_store_explicit(&rlc_ctx.recvMaxByte, node->data_size, memory_order_relaxed);
(RcvPktHeader *)pt = ((RcvPktHeader *))((char *)node->data + sizeof(RcvPktHeader));
/* write one cacheline */
tmp = *pt;
*(pt + 1) = tmp;
atomic_fetch_add_explicit(&rlc_ctx.sduNumCong, 1, memory_order_relaxed);
atomic_store_explicit(&rlc_ctx.sudCongState, 1, memory_order_relaxed);
atomic_fetch_add_explicit(&rlc_ctx.pktdelayEnqueFlag, 1, memory_order_relaxed);
}
// timer_body_1 = benchmark_get_cycle();

// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("[core %u][bd fill_node] mm_alloc: node = %p, data = 0x%x, tgt = 0x%x, data_size = %zu, bd=%d\n",
// core_id,
// (void *)node,
// node->data,
// node->tgt,
// node->data_size,
// (timer_body_1 - timer_body_0)
// );
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);
// DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
// DEBUG_PRINTF("[core %u][bd fill_node] mm_alloc: node = %p, data = 0x%x, tgt = 0x%x, data_size = %zu, bd=%d\n",
// core_id,
// (void *)node,
// node->data,
// node->tgt,
// node->data_size,
// (timer_body_1 - timer_body_0)
// );
// DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);


// /* Zero-initialize the payload using our custom mm_memset */
// mm_memset(node->data, 0, PACKET_SIZE);
/* Append the node to the shared linked list */
list_push_back(&tosend_llist_lock_2, &rlc_ctx.list, node);
// /* Zero-initialize the payload using our custom mm_memset */
// mm_memset(node->data, 0, PACKET_SIZE);
/* Append the node to the shared linked list */
list_push_back(&tosend_llist_lock_2, &rlc_ctx.list, node);
atomic_fetch_add_explicit(&rlc_ctx.sduBytes, node->data_size, memory_order_relaxed);
atomic_fetch_add_explicit(&rlc_ctx.sduNum, 1, memory_order_relaxed);
atomic_fetch_add_explicit(&rlc_ctx.recvPdcpPduBytes, node->data_size, memory_order_relaxed);
rlc_ctx.lastRcvOrSubmitDataCyc = benchmark_get_cycle() - rlc_ctx.firstSduPktRxCycle;

DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
DEBUG_PRINTF("Producer (core %u): added node %p, size = %d, src_addr = 0x%x, tgt_addr = 0x%x\n",
core_id,
(void *)node,
node->data_size,
node->data,
node->tgt);
DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);
atomic_fetch_add_explicit(&rlc_ctx.rcvPktNums, 1, memory_order_relaxed);
atomic_fetch_add_explicit(&rlc_ctx.rcvPktLength, node->data_size, memory_order_relaxed);
atomic_fetch_add_explicit(&rlc_ctx.enQuePktNum, 1, memory_order_relaxed);
atomic_fetch_add_explicit(&rlc_ctx.enQuePktLength, node->data_size, memory_order_relaxed);

// Get the pointer to the next PDCP package
new_pdcp_pkg_ptr = pdcp_receive_pkg(core_id, &pdcp_pkd_ptr_lock);
// delay(200); /* Delay between node productions */
}
DEBUG_PRINTF_LOCK_ACQUIRE(&printf_lock);
DEBUG_PRINTF("Producer (core %u): added node %p, size = %d, src_addr = 0x%x, tgt_addr = 0x%x\n",
core_id,
(void *)node,
node->data_size,
node->data,
node->tgt);
DEBUG_PRINTF_LOCK_RELEASE(&printf_lock);

// delay(200); /* Delay between node productions */
// Set producer done flag
atomic_store_explicit(&producer_done, 1, memory_order_relaxed);
// atomic_store_explicit(&producer_done, 1, memory_order_relaxed);
}

static void pkt_production_and_recycle(const unsigned int core_id)
{
uint32_t total_cycle = PRODUCER_CORE_NUM * PDU_SIZE * CPU_FREQENCY / INPUT_DATARATE;
while (1) {
uint32_t start_timecycle = benchmark_get_cycle();
producer(core_id);
if (core_id == 0) { /* ue status report runs in one core */
ue_status_rpt(core_id);
}
uint32_t end_timecycle = benchmark_get_cycle();
/* calculate delay interval */
uint32_t interval = end_timecycle - start_timecycle;
uint32_t delayCycle = (total_cycle >= interval) ? (total_cycle - interval) : 0;
delay(delayCycle);
}
}

/* cluster_entry() dispatches behavior based on core_id */
Expand All @@ -395,10 +487,10 @@ void cluster_entry(const unsigned int core_id) {
start_kernel();
}

if (core_id >= 2) {
if (core_id == 3) { /* consumer runs in one core */
consumer(core_id);
} else /*if (core_id == 0)*/ {
producer(core_id);
} else /* produce runs in multi core */ {
pkt_production_and_recycle(core_id);
}/* else {
while (1) {}
}*/
Expand Down
Loading