summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Ribas <jonathan.ribas@fraudbuster.mobi>2019-01-26 20:59:39 +0100
committerJonathan Ribas <jonathan.ribas@fraudbuster.mobi>2019-01-26 21:11:25 +0100
commit8a083840579e1b6fff067107b9fbc6353bf724e8 (patch)
treed25aa9dea79f02b5abf2fedd6db3102c82ab143d
parentc954761dc5dbe7a311d379c01626d84984b51bfe (diff)
downloaddpdk-burst-replay-8a083840579e1b6fff067107b9fbc6353bf724e8.zip
dpdk-burst-replay-8a083840579e1b6fff067107b9fbc6353bf724e8.tar.gz
dpdk-burst-replay-8a083840579e1b6fff067107b9fbc6353bf724e8.tar.xz
Add the pcap timestamps to the threads ctx, duplicate the tx_thread func to
have two different implementations (one for topspeed, and one to respect timers).
-rw-r--r--src/dpdk.c95
-rw-r--r--src/main.h1
2 files changed, 95 insertions, 1 deletions
diff --git a/src/dpdk.c b/src/dpdk.c
index ad87446..0b70827 100644
--- a/src/dpdk.c
+++ b/src/dpdk.c
@@ -278,6 +278,97 @@ double timespec_diff_to_double(const struct timespec start, const struct timespe
return (duration);
}
+int tx_topspeed_thread(void* thread_ctx)
+{
+ struct thread_ctx* ctx;
+ struct rte_mbuf** mbuf;
+ struct timespec start, end;
+ unsigned int tx_queue;
+ int ret, thread_id, index, i, run_cpt, retry_tx;
+ int nb_sent, to_sent, total_to_sent, total_sent;
+ int nb_drop;
+
+ if (!thread_ctx)
+ return (EINVAL);
+
+ /* retrieve thread context */
+ ctx = (struct thread_ctx*)thread_ctx;
+ thread_id = ctx->tx_port_id;
+ mbuf = ctx->pcap_cache->mbufs;
+#ifdef DEBUG
+ printf("Starting thread %i.\n", thread_id);
+#endif
+
+ /* init semaphore to wait to start the burst */
+ ret = sem_wait(ctx->sem);
+ if (ret) {
+ fprintf(stderr, "sem_wait failed on thread %i: %s\n",
+ thread_id, strerror(ret));
+ return (ret);
+ }
+
+ /* get the start time */
+ ret = clock_gettime(CLOCK_MONOTONIC, &start);
+ if (ret) {
+ fprintf(stderr, "clock_gettime failed on start for thread %i: %s\n",
+ thread_id, strerror(errno));
+ return (errno);
+ }
+
+ /* iterate on each wanted runs */
+ for (run_cpt = ctx->nbruns, tx_queue = ctx->total_drop = ctx->total_drop_sz = 0;
+ run_cpt;
+ ctx->total_drop += nb_drop, run_cpt--) {
+ /* iterate on pkts for every batch of BURST_SZ number of packets */
+ for (total_to_sent = ctx->nb_pkt, nb_drop = 0, to_sent = min(BURST_SZ, total_to_sent);
+ to_sent;
+ total_to_sent -= to_sent, to_sent = min(BURST_SZ, total_to_sent)) {
+ /* calculate the mbuf index for the current batch */
+ index = ctx->nb_pkt - total_to_sent;
+
+ /* send the burst batch, and retry NB_RETRY_TX times if we */
+ /* didn't success to sent all the wanted batch */
+ for (total_sent = 0, retry_tx = NB_RETRY_TX;
+ total_sent < to_sent && retry_tx;
+ total_sent += nb_sent, retry_tx--) {
+ nb_sent = rte_eth_tx_burst(ctx->tx_port_id,
+ (tx_queue++ % NB_TX_QUEUES),
+ &(mbuf[index + total_sent]),
+ to_sent - total_sent);
+ if (retry_tx != NB_RETRY_TX &&
+ tx_queue % NB_TX_QUEUES == 0)
+ usleep(100);
+ }
+ /* free unseccessfully sent */
+ if (unlikely(!retry_tx))
+ for (i = total_sent; i < to_sent; i++) {
+ nb_drop++;
+ ctx->total_drop_sz += mbuf[index + i]->pkt_len;
+ rte_pktmbuf_free(mbuf[index + i]);
+ }
+ }
+#ifdef DEBUG
+ if (unlikely(nb_drop))
+ printf("[thread %i]: on loop %i: sent %i pkts (%i were dropped).\n",
+ thread_id, ctx->nbruns - run_cpt, ctx->nb_pkt, nb_drop);
+#endif /* DEBUG */
+ }
+
+ /* get the ends time and calculate the duration */
+ ret = clock_gettime(CLOCK_MONOTONIC, &end);
+ if (ret) {
+ fprintf(stderr, "clock_gettime failed on finish for thread %i: %s\n",
+ thread_id, strerror(errno));
+ return (errno);
+ }
+ ctx->duration = timespec_diff_to_double(start, end);
+#ifdef DEBUG
+ printf("Exiting thread %i properly.\n", thread_id);
+#endif /* DEBUG */
+ return (0);
+}
+
+/* TODO: modify it to respect each pkt timestamp */
int tx_thread(void* thread_ctx)
{
struct thread_ctx* ctx;
@@ -435,11 +526,13 @@ int start_tx_threads(const struct cmd_opts* opts,
ctx[i].pcap_cache = &(dpdk->pcap_caches[i]);
ctx[i].nb_pkt = pcap->nb_pkts;
ctx[i].nb_tx_queues = NB_TX_QUEUES;
+ ctx[i].pkts_ts = dpdk->pkts_ts;
}
/* launch threads, which will wait on the semaphore to start */
for (i = 0; i < cpus->nb_needed_cpus; i++) {
- ret = rte_eal_remote_launch(tx_thread, &(ctx[i]),
+ ret = rte_eal_remote_launch((opts->topspeed ? tx_topspeed_thread : tx_thread),
+ &(ctx[i]),
cpus->cpus_to_use[i + 1]); /* skip fake master core */
if (ret) {
fprintf(stderr, "rte_eal_remote_launch failed: %s\n", strerror(ret));
diff --git a/src/main.h b/src/main.h
index 7849693..4b9cf3d 100644
--- a/src/main.h
+++ b/src/main.h
@@ -87,6 +87,7 @@ struct thread_ctx {
unsigned int total_drop;
unsigned int total_drop_sz;
struct pcap_cache* pcap_cache;
+ struct pcap_pkt_ts* pkts_ts;
};
struct pcap_ctx {