Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/c-libp2p
6 changes: 1 addition & 5 deletions include/lantern/networking/reqresp_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@

#define LANTERN_REQRESP_STATUS_PROTOCOL_SNAPPY "/leanconsensus/req/status/1/ssz_snappy"
#define LANTERN_REQRESP_BLOCKS_BY_ROOT_PROTOCOL_SNAPPY "/leanconsensus/req/blocks_by_root/1/ssz_snappy"
#define LANTERN_REQRESP_LEAN_BLOCKS_BY_ROOT_PROTOCOL_SNAPPY "/leanconsensus/req/lean_blocks_by_root/1/ssz_snappy"
#define LANTERN_REQRESP_STATUS_PROTOCOL LANTERN_REQRESP_STATUS_PROTOCOL_SNAPPY
#define LANTERN_REQRESP_BLOCKS_BY_ROOT_PROTOCOL LANTERN_REQRESP_LEAN_BLOCKS_BY_ROOT_PROTOCOL_SNAPPY
#define LANTERN_REQRESP_BLOCKS_BY_ROOT_PROTOCOL_FALLBACK LANTERN_REQRESP_BLOCKS_BY_ROOT_PROTOCOL_SNAPPY
#define LANTERN_REQRESP_BLOCKS_BY_ROOT_PROTOCOL LANTERN_REQRESP_BLOCKS_BY_ROOT_PROTOCOL_SNAPPY
#define LANTERN_REQRESP_STATUS_PREVIEW_BYTES 256u
#define LANTERN_REQRESP_MAX_CHUNK_BYTES (10u * 1024u * 1024u)
#define LANTERN_REQRESP_MAX_CONTEXT_BYTES (1u << 20)
Expand Down Expand Up @@ -57,7 +55,6 @@ enum lantern_reqresp_protocol_kind {

#define LANTERN_STATUS_PROTOCOL_ID LANTERN_REQRESP_STATUS_PROTOCOL
#define LANTERN_BLOCKS_BY_ROOT_PROTOCOL_ID LANTERN_REQRESP_BLOCKS_BY_ROOT_PROTOCOL
#define LANTERN_BLOCKS_BY_ROOT_PROTOCOL_FALLBACK_ID LANTERN_REQRESP_BLOCKS_BY_ROOT_PROTOCOL_FALLBACK
#define LANTERN_STATUS_PREVIEW_BYTES LANTERN_REQRESP_STATUS_PREVIEW_BYTES

struct libp2p_host;
Expand Down Expand Up @@ -93,7 +90,6 @@ struct lantern_reqresp_service {
struct lantern_reqresp_service_callbacks callbacks;
struct libp2p_protocol_server *status_server;
struct libp2p_protocol_server *blocks_server;
struct libp2p_protocol_server *blocks_server_legacy;
struct libp2p_subscription *event_subscription;
int lock_initialized;
pthread_mutex_t lock;
Expand Down
1 change: 0 additions & 1 deletion src/core/client_network_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ struct block_request_ctx
uint32_t *depths; /**< Backfill depth per root */
size_t root_count; /**< Number of roots requested */
const char *protocol_id; /**< Protocol ID string */
bool tried_fallback; /**< True if fallback protocol already attempted */
};


Expand Down
122 changes: 8 additions & 114 deletions src/core/client_reqresp_blocks.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,6 @@ static void block_request_ctx_free(struct block_request_ctx *ctx)
free(ctx);
}

static void write_u32_le(uint8_t *out, uint32_t value)
{
if (!out)
{
return;
}
out[0] = (uint8_t)(value & 0xffu);
out[1] = (uint8_t)((value >> 8) & 0xffu);
out[2] = (uint8_t)((value >> 16) & 0xffu);
out[3] = (uint8_t)((value >> 24) & 0xffu);
}

/* ============================================================================
* Block Chunk Processing
* ============================================================================ */
Expand Down Expand Up @@ -349,15 +337,7 @@ static void *block_request_worker(void *arg)
}

size_t roots_bytes = request.roots.length * LANTERN_ROOT_SIZE;
bool use_container = false;
if (ctx->protocol_id
&& LANTERN_BLOCKS_BY_ROOT_PROTOCOL_FALLBACK_ID
&& strcmp(ctx->protocol_id, LANTERN_BLOCKS_BY_ROOT_PROTOCOL_FALLBACK_ID) != 0)
{
use_container = true;
}

size_t raw_size = roots_bytes + (use_container ? 4u : 0u);
size_t raw_size = roots_bytes;
raw_request = (uint8_t *)malloc(raw_size > 0 ? raw_size : 1u);
if (!raw_request)
{
Expand All @@ -369,44 +349,14 @@ static void *block_request_worker(void *arg)
}

size_t raw_written = 0;
if (use_container)
{
if (raw_size < 4u)
{
lantern_log_error(
"reqresp",
&meta,
"failed to size blocks_by_root request");
goto cleanup;
}
write_u32_le(raw_request, 4u);
size_t roots_written = 0;
if (lantern_network_blocks_by_root_request_encode(
&request,
raw_request + 4u,
raw_size - 4u,
&roots_written)
!= 0)
{
lantern_log_error(
"reqresp",
&meta,
"failed to encode blocks_by_root request");
goto cleanup;
}
raw_written = 4u + roots_written;
}
else
if (lantern_network_blocks_by_root_request_encode(&request, raw_request, raw_size, &raw_written) != 0
|| raw_written == 0)
{
if (lantern_network_blocks_by_root_request_encode(&request, raw_request, raw_size, &raw_written) != 0
|| raw_written == 0)
{
lantern_log_error(
"reqresp",
&meta,
"failed to encode blocks_by_root request");
goto cleanup;
}
lantern_log_error(
"reqresp",
&meta,
"failed to encode blocks_by_root request");
goto cleanup;
}

size_t max_payload = 0;
Expand Down Expand Up @@ -652,56 +602,6 @@ static void *block_request_worker(void *arg)
* Stream Open Callback
* ============================================================================ */

static bool block_request_try_fallback(
struct block_request_ctx *ctx,
int err,
const struct lantern_log_metadata *meta)
{
if (!ctx || ctx->tried_fallback)
{
return false;
}
if (err != LIBP2P_ERR_UNSUPPORTED && err != LIBP2P_ERR_PROTO_NEGOTIATION_FAILED)
{
return false;
}
if (!ctx->client || !ctx->client->network.host)
{
return false;
}
const char *fallback = LANTERN_BLOCKS_BY_ROOT_PROTOCOL_FALLBACK_ID;
if (!fallback || !ctx->protocol_id || strcmp(fallback, ctx->protocol_id) == 0)
{
return false;
}

ctx->tried_fallback = true;
ctx->protocol_id = fallback;
lantern_log_warn(
"reqresp",
meta,
"retrying blocks_by_root with fallback protocol=%s err=%d",
ctx->protocol_id,
err);

int rc = libp2p_host_open_stream_async(
ctx->client->network.host,
&ctx->peer_id,
ctx->protocol_id,
block_request_on_open,
ctx);
if (rc == 0)
{
return true;
}
lantern_log_warn(
"reqresp",
meta,
"fallback blocks_by_root open failed rc=%d",
rc);
return false;
}

/**
* Callback when a block request stream opens.
*
Expand Down Expand Up @@ -738,7 +638,6 @@ static void block_request_on_open(libp2p_stream_t *stream, void *user_data, int
"block request stream opened protocol=%s err=%d",
ctx->protocol_id ? ctx->protocol_id : "(unknown)",
err);

if (err != 0 || !stream)
{
lantern_log_warn(
Expand All @@ -752,10 +651,6 @@ static void block_request_on_open(libp2p_stream_t *stream, void *user_data, int
libp2p_stream_free(stream);
stream = NULL;
}
if (block_request_try_fallback(ctx, err, &meta))
{
return;
}
if (ctx->client)
{
lantern_client_on_blocks_request_complete_batch(
Expand Down Expand Up @@ -892,7 +787,6 @@ static int schedule_blocks_request_batch(
ctx->client = client;
ctx->root_count = root_count;
ctx->protocol_id = LANTERN_BLOCKS_BY_ROOT_PROTOCOL_ID;
ctx->tried_fallback = false;
strncpy(ctx->peer_text, peer_id_text, sizeof(ctx->peer_text) - 1);
ctx->peer_text[sizeof(ctx->peer_text) - 1] = '\0';

Expand Down
37 changes: 30 additions & 7 deletions src/core/client_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,30 @@ void persist_anchor_block(
block->state_root = anchor_block->state_root;

LanternRoot computed_root;
const LanternRoot *root_for_vote = anchor_root;
const LanternRoot *root_to_log = anchor_root;
if (!root_to_log)
if (!root_for_vote)
{
if (lantern_hash_tree_root_block(block, &computed_root) == 0)
{
root_to_log = &computed_root;
root_for_vote = &computed_root;
root_to_log = root_for_vote;
}
}

if (root_for_vote)
{
LanternVote *vote = &stored_anchor.message.proposer_attestation;
LanternCheckpoint anchor_checkpoint = {
.root = *root_for_vote,
.slot = block->slot,
};
vote->validator_id = block->proposer_index;
vote->slot = block->slot;
vote->head = anchor_checkpoint;
vote->target = anchor_checkpoint;
vote->source = anchor_checkpoint;
}
char root_hex[ROOT_HEX_BUFFER_LEN];
root_hex[0] = '\0';
if (root_to_log)
Expand Down Expand Up @@ -674,14 +690,21 @@ int restore_persisted_blocks(struct lantern_client *client)
for (size_t i = 0; i < list.length; ++i)
{
const struct lantern_persisted_block *entry = &list.items[i];
const LanternBlock *block = &entry->block.message.block;
const LanternVote *vote = &entry->block.message.proposer_attestation;
LanternSignedVote persisted_proposer;
memset(&persisted_proposer, 0, sizeof(persisted_proposer));
persisted_proposer.data = entry->block.message.proposer_attestation;
persisted_proposer.signature = entry->block.signatures.proposer_signature;
const LanternSignedVote *proposer_ptr = NULL;
if (vote->slot == block->slot && vote->validator_id == block->proposer_index)
{
memset(&persisted_proposer, 0, sizeof(persisted_proposer));
persisted_proposer.data = *vote;
persisted_proposer.signature = entry->block.signatures.proposer_signature;
proposer_ptr = &persisted_proposer;
}
if (lantern_fork_choice_add_block(
&client->fork_choice,
&entry->block.message.block,
&persisted_proposer,
block,
proposer_ptr,
&client->state.latest_justified,
&client->state.latest_finalized,
&entry->root)
Expand Down
28 changes: 0 additions & 28 deletions src/networking/reqresp_service.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ static void lantern_reqresp_service_clear(struct lantern_reqresp_service *servic
service->callbacks.collect_blocks = NULL;
service->status_server = NULL;
service->blocks_server = NULL;
service->blocks_server_legacy = NULL;
service->event_subscription = NULL;
}

Expand Down Expand Up @@ -136,9 +135,6 @@ void lantern_reqresp_service_reset(struct lantern_reqresp_service *service) {
if (service->blocks_server && host) {
(void)libp2p_host_unlisten(host, service->blocks_server);
}
if (service->blocks_server_legacy && host) {
(void)libp2p_host_unlisten(host, service->blocks_server_legacy);
}
lantern_reqresp_service_clear(service);
}

Expand Down Expand Up @@ -2630,10 +2626,6 @@ static void blocks_on_open_primary(libp2p_stream_t *stream, void *user_data) {
blocks_on_open_impl(stream, user_data, LANTERN_BLOCKS_BY_ROOT_PROTOCOL_ID);
}

static void blocks_on_open_legacy(libp2p_stream_t *stream, void *user_data) {
blocks_on_open_impl(stream, user_data, LANTERN_BLOCKS_BY_ROOT_PROTOCOL_FALLBACK_ID);
}

int lantern_reqresp_service_start(
struct lantern_reqresp_service *service,
const struct lantern_reqresp_service_config *config) {
Expand All @@ -2658,7 +2650,6 @@ int lantern_reqresp_service_start(
status_def.user_data = service;

const char *blocks_protocol_primary = LANTERN_BLOCKS_BY_ROOT_PROTOCOL_ID;
const char *blocks_protocol_fallback = LANTERN_BLOCKS_BY_ROOT_PROTOCOL_FALLBACK_ID;

libp2p_protocol_def_t blocks_def;
memset(&blocks_def, 0, sizeof(blocks_def));
Expand All @@ -2676,25 +2667,6 @@ int lantern_reqresp_service_start(
return -1;
}

if (blocks_protocol_fallback
&& blocks_protocol_primary
&& strcmp(blocks_protocol_fallback, blocks_protocol_primary) != 0) {
libp2p_protocol_def_t blocks_legacy_def;
memset(&blocks_legacy_def, 0, sizeof(blocks_legacy_def));
blocks_legacy_def.protocol_id = blocks_protocol_fallback;
blocks_legacy_def.read_mode = LIBP2P_READ_PULL;
blocks_legacy_def.on_open = blocks_on_open_legacy;
blocks_legacy_def.user_data = service;
if (libp2p_host_listen_protocol(
service->host,
&blocks_legacy_def,
&service->blocks_server_legacy)
!= 0) {
lantern_reqresp_service_reset(service);
return -1;
}
}

lantern_log_info(
"network",
&(const struct lantern_log_metadata){0},
Expand Down