Commit 5f8d9977 authored by Jenda's avatar Jenda

comments everywhere

parent 311024ad
......@@ -10,5 +10,5 @@ c2s_pb2.py:
clean:
$(MAKE) -C modes clean
rm c2s_pb2.py c2s_pb2.pyc
rm -f c2s_pb2.py c2s_pb2.pyc
package c2s;
/* This files specifies all Kukuruku protocol messages except the raw
* payload message, which is specified in client_parser.h. */
enum special_payload_type {
SPECTRUM = 1;
HISTO = 2;
}
package c2s;
// Format of samples we want to receive
enum sample_type {
F32 = 1;
I16 = 2;
I8 = 3;
F32 = 1; // float32
I16 = 2; // int16_t
I8 = 3; // int8_t
}
enum command_type {
DUMPBUFFER = 1;
RECORD_START = 2;
RECORD_STOP = 19;
CREATE_XLATER = 3;
LIST_XLATERS = 4;
DESTROY_XLATER = 5;
ENABLE_XLATER = 6;
DISABLE_XLATER = 7;
SET_GAIN = 8;
RETUNE = 9;
SET_PPM = 10;
SET_HISTO_FFT = 11;
SET_RATE = 12;
ENABLE_SPECTRUM = 13;
DISABLE_SPECTRUM = 14;
ENABLE_HISTO = 15;
DISABLE_HISTO = 16;
GET_INFO = 17;
MODIFY_XLATER = 18;
PAYLOAD = 256;
DUMPED = 257;
RUNNING_XLATER = 258;
INFO = 259;
DESTROYED_XLATER = 260;
}
/* Messages in the directions client -> server are prefixed CLI_
* Messages from the server to the client are prefixed SRV_
*/
// Create xlater on the server
message CLI_CREATE_XLATER {
required int32 remoteid = 1;
required float rotate = 2;
required int32 remoteid = 1; // client reference ID
required float rotate = 2; // rotator (the signal is multiplied by e^(rotator*sample))
required int32 decimation = 3;
required int32 startframe = 4;
repeated float taps = 5;
required int32 startframe = 4; // set to ID of frame or -1 if you don't want history
repeated float taps = 5; // filter coefficients
}
// Subscribe xlater
message CLI_ENABLE_XLATER {
required int32 id = 1;
required sample_type type = 2;
}
// Unsubscribe
message CLI_DISABLE_XLATER {
required int32 id = 1;
}
// Modify xlater
message CLI_MODIFY_XLATER {
required int32 localid = 1;
required float rotate = 2;
repeated float newtaps = 3;
required float rotate = 2; // mandatory
repeated float newtaps = 3; // optional -- if not set, taps are not changed
}
// Completely delete an xlater on server
message CLI_DESTROY_XLATER {
required int32 id = 1;
}
/* Record SDR baseband to cfile (server-side) in the current working directory.
* Additionally, a .txt file is created containing metadata.
* The .txt file contains lines of the following format:
* block <size in bytes> time <unix timestamp> freq <frequency>
*/
message CLI_RECORD_START {
required int32 startframe = 1;
required int32 stopframe = 2;
required int32 startframe = 1; // first frame ID of the recording, -1 for current
required int32 stopframe = 2; // last frame ID if the recording
// if you want to record indefinitely, set stopframe to INT_MAX
// and to stop recording just set both to negative values
}
// Set SDR gain, these four fields correspond to the osmosdr ones and are mostly hardware-dependent.
message CLI_SET_GAIN {
required int32 autogain = 1;
required int32 global_gain = 2;
......@@ -78,48 +66,42 @@ message CLI_SET_GAIN {
required int32 bb_gain = 4;
}
// Set SDR frequency
message CLI_RETUNE {
required int64 freq = 1;
}
// Set SDR clock correction
message CLI_SET_PPM {
required int32 ppm = 1;
}
message CLI_SET_FFT {
required int32 fftsize = 1;
required int32 decim = 2;
}
// Server announces that an xlater has been created
message SRV_RUNNING_XLATER {
required int32 remoteid = 1;
required int32 id = 2;
required int32 remoteid = 1; // client-submitted ID, or -1 for clients that did not create the xlater
required int32 id = 2; // server-assigned ID, use this for any further references to the xlater
required float rotate = 3;
required int32 decimation = 4;
}
message SRV_PAYLOAD {
required int32 id = 1;
required int32 time = 2;
required int32 frameno = 3;
required sample_type type = 4;
repeated float samples = 5 [packed=true];
}
// Server info, generated on every change
message SRV_INFO {
required int32 samplerate = 1;
required int64 frequency = 2;
required int32 samplerate = 1; // SDR sample rate
required int64 frequency = 2; // frequency the SDR is tuned to
required int32 ppm = 3;
required int32 fftw = 4;
required int32 fftw = 4; // size of spectrum
required int32 autogain = 5;
required int32 global_gain = 6;
required int32 if_gain = 7;
required int32 bb_gain = 8;
required int32 packetlen = 9;
required int32 bufsize = 10;
required int32 maxtaps = 11;
required int32 packetlen = 9; // SDRPACKETSIZE (see constants.h)
required int32 bufsize = 10; // BUFSIZE (see constants.h)
required int32 maxtaps = 11; // MAXTAPS (see constants.h)
}
// Server announces an xlater has been deleted
message SRV_DESTROYED_XLATER {
required int32 id = 1;
}
......@@ -9,16 +9,16 @@
int parse_client_req(tcp_cli_t *, const uint8_t *, int32_t);
enum special_payload_type {
SPECTRUM = -1,
HISTO = -2,
SPECTRUM = -1, // this payload containt spectrum measurements
HISTO = -2, // this payload contains histogram
// any nonnegative number -> this payload contains data for that xlater
};
typedef enum command_type {
DUMPBUFFER = 1,
RECORD_START = 2,
RECORD_STOP = 19,
CREATE_XLATER = 3,
LIST_XLATERS = 4,
LIST_XLATERS = 4, // tells the server to send SRV_RUNNING_XLATER for all xlaters
DESTROY_XLATER = 5,
ENABLE_XLATER = 6,
DISABLE_XLATER = 7,
......@@ -27,11 +27,11 @@ typedef enum command_type {
SET_PPM = 10,
SET_HISTO_FFT = 11,
SET_RATE = 12,
ENABLE_SPECTRUM = 13,
ENABLE_SPECTRUM = 13, // tells the server we want to receive spectrum measurements
DISABLE_SPECTRUM = 14,
ENABLE_HISTO = 15,
DISABLE_HISTO = 16,
GET_INFO = 17,
GET_INFO = 17, // request SRV_INFO
MODIFY_XLATER = 18,
PAYLOAD = 256,
......@@ -39,21 +39,27 @@ typedef enum command_type {
RUNNING_XLATER = 258,
INFO = 259,
DESTROYED_XLATER = 260,
} command_type;
// Now the payload message, the only one that is not protobuf'd (for efficiency reasons)
struct __attribute__ ((__packed__)) SRV_PAYLOAD_HEADER {
command_type t; // set to PAYLOAD
int32_t id; // xlater ID or SPECTRUM or HISTO
int32_t time; // timestamp when this has been recorded
int32_t frameno; // frame number of this frame
sample_type type; // format of samples
};
/* Now specification of osmosdr control interface (server sends this to the pipe given by -i parameter)
*
* Every time, a header is sent and then specific payload follows
*/
typedef enum sdr_iface {
SDR_IFACE_TUNE = 1,
SDR_IFACE_PPM = 2,
SDR_IFACE_GAIN = 3,
SDR_IFACE_TUNE = 1, // int64_t follows
SDR_IFACE_PPM = 2, // int32_t follows
SDR_IFACE_GAIN = 3, // 4 int32_ts follow
} sdr_iface;
struct __attribute__ ((__packed__)) SRV_PAYLOAD_HEADER {
command_type t;
int32_t id;
int32_t time;
int32_t frameno;
sample_type type;
};
#endif
......@@ -2,21 +2,34 @@
#ifndef CONSTANTS_H
#define CONSTANTS_H
/* Number of samples to read each time we read from SDR
* (so it is this*sizeof(complex64) bytes)
* You want to set this to something between 128 and 1024 Ki.
*/
#define SDRPACKETSIZE (512*1024)
#define BUFSIZE 64
#define MAXTAPS 4096
#define COMPLEX 2
/* Number of packets to keep.
* So we will allocate SDRPACKETSIZE*8*BUFSIZE bytes of memory. */
#define BUFSIZE 128
#define MEGA 1000000
#define GIGA 1000000000
/* The longest allowed filter */
#define MAXTAPS 4096
/* Size of spectrum transform. Increase this for better resolution on wideband SDRs. */
#define FFTSIZE 1024
#define WRITE_FRAMES_SYNC 2
#define HISTOGRAM_RES 256
#define COMPLEX 2
#define MEGA 1000000
#define GIGA 1000000000
#define MIN(a,b) (((a)<(b))?(a):(b))
#define MAX(a,b) (((a)>(b))?(a):(b))
#define CLAMP(x, lower, upper) (MIN(upper, MAX(x, lower)))
......
......@@ -57,6 +57,8 @@ void fftw_init(int N) {
* spp - how many waterfall lines to compute
* fftskip - distance between beginning of transforms (we don't usually compute
* side-by-side or even overlapping FFT, but rather sample the signal once a while)
*
* What is exacly calculated? 10log10((Re^2 + Im^2)/N) for the unnormalized FFT with Hamming window.
*/
void calc_spectrum(sdr_packet * pkt, int spp, int fftskip) {
if(fftw_window == NULL) {
......@@ -88,27 +90,25 @@ void calc_spectrum(sdr_packet * pkt, int spp, int fftskip) {
/* Transform */
fftwf_execute(p);
/* FFTW computes DFT without scaling by 1/sqrt(N) */
float dftscale = 1/sqrtf(fftsize);
/* Read output */
float * fout = (float *) fftw_out;
for(int k = 0; k<fftsize; k++) {
float v1 = fout[COMPLEX*k];
float v2 = fout[COMPLEX*k+1];
/* We are computing sum(log(hypot(i,q))).
* We can compute log(prod(hypot(i,q))) and save computationally intesive logarithms */
/* gah, numerically unstable! probably can be implemented by splitting the float with
/* We are computing sum(log(something))).
* We can compute log(prod(something)) and save computationally intesive logarithms.
* However, this seems to be numerically unstable (you multiply small numbers until
* you end up with 0). Probably can be implemented by splitting the float with
* frexp(3) and computing exponent separately...
if(j == 0) {
fftw_avg[k] = hypotf(v1, v2);
fftw_avg[k] = (v1*v1 + v2*v2);
} else {
fftw_avg[k] *= hypotf(v1, v2);
fftw_avg[k] *= (v1*v1 + v2*v2);
}*/
fftw_avg[k] += log10f((v1*v1 + v2*v2)/fftsize);
//fftw_avg[k] += logf(hypotf(v1, v2) * dftscale);
}
......
#ifndef SAMPLE_TYPE_H
#define SAMPLE_TYPE_H
// Sample format enum
typedef enum {
F32 = 1,
I16 = 2,
......
......@@ -2,14 +2,17 @@
#define SDR_PACKET_H
#include <inttypes.h>
// One packet that we read from SDR
typedef struct sdr_packet {
int64_t frequency;
int timestamp;
int32_t frameno;
float * data;
float * spectrum;
size_t spectrumsize;
uint16_t * histo;
float * data; // the actual data, SDRPACKET complex samples
float * spectrum; // spectrum computed with calc_spectrum
size_t spectrumsize; // size of spectrum (bytes)
uint16_t * histo; // histogram computed by calc_histogram
} sdr_packet;
#endif
......@@ -34,23 +34,37 @@
#include <netinet/in.h>
#include <netdb.h>
// The last frame ID that was read from SDR
int32_t sdr_cptr = -1;
// The last frame ID that was sent to all clients
int32_t send_cptr = -1;
// The last frame ID that has been recorded to disk (when we are recording the entire baseband)
int32_t rec_cptr = INT32_MIN;
// The frame ID where recording should stop
int32_t rec_stop = 0;
// path to file we are recording to (without ".cfile" and ".txt")
char * recpath;
// the input SDR buffer
sdr_packet sdr_inbuf[BUFSIZE];
// sdr_read_thr
pthread_t sdr_thread;
// socket_write_thr
pthread_t socket_thread;
// mutex guarding all linked lists
pthread_mutex_t llmutex = PTHREAD_MUTEX_INITIALIZER;
// mutex guarding manipulation with all buffers
pthread_mutex_t datamutex = PTHREAD_MUTEX_INITIALIZER;
// condition on which workers wait when there is no work to be done
pthread_cond_t datacond = PTHREAD_COND_INITIALIZER;
// list of all TCP clients
extern SLIST_HEAD(tcp_cli_head_t, tcp_cli_t) tcp_cli_head;
// osmosdr-input.py control and data interface
char * sdr_cmd_file;
FILE * sdr_cmd;
char * sdr_pipe_file;
......@@ -62,6 +76,7 @@ int32_t ppm = INT32_MIN;
int32_t fftw = FFTSIZE;
struct current_gain_t gain;
// list of all workers
SLIST_HEAD(worker_head_t, worker) worker_head = SLIST_HEAD_INITIALIZER(worker_head);
// Allocate SDR input buffer
......@@ -76,6 +91,7 @@ static void allocate_sdr_buf() {
}
}
// This thread reads data from SDR, puts them to sdr_inbuf and also records them to file when enabled.
static void * sdr_read_thr(void * a) {
// open the pipe we are reading from
sdr_pipe = fopen(sdr_pipe_file, "r");
......@@ -98,9 +114,11 @@ static void * sdr_read_thr(void * a) {
char * cfilepath;
char * metapath;
// data file (cfile)
if(asprintf(&cfilepath, "%s.cfile", recpath) == -1) {
err(EXIT_FAILURE, "asprintf");
}
// metadata file (txt)
if(asprintf(&metapath, "%s.txt", recpath) == -1) {
err(EXIT_FAILURE, "asprintf");
}
......@@ -140,7 +158,7 @@ static void * sdr_read_thr(void * a) {
free(metapath);
}
// We wish to write packet sdr_cptr+1.
// We wish to put packet sdr_cptr+1 to buffer.
if(!locked) {
pthread_mutex_lock(&datamutex);
......@@ -172,7 +190,7 @@ static void * sdr_read_thr(void * a) {
sdr_inbuf[base].timestamp = time(0);
sdr_inbuf[base].frameno = sdr_cptr;
sdr_inbuf[base].frequency = frequency;
calc_spectrum(&(sdr_inbuf[base]), 1, 10240);
calc_spectrum(&(sdr_inbuf[base]), 1, SDRPACKETSIZE/64);
calc_histogram(&(sdr_inbuf[base]), 65535);
// signal that new data are available
......@@ -183,16 +201,19 @@ static void * sdr_read_thr(void * a) {
}
}
// This thread reads filtered channels from workers and writes them to TCP clients.
static void * socket_write_thr(void * a) {
while(1) {
// We wish to read packet send_cptr+1
// We wish to read packet send_cptr+1 from buffer and write it to sockets
pthread_mutex_lock(&llmutex);
tcp_cli_t * client;
bool willwait = true;
// projdu všechny workery, zapisuju dokud jejich last_written > send_cptr, send_cptr++
// We go through all workers and until last_written > send_cptr (see explanation in
// worker.h), we take the data and send them to clients; send_cptr++
// All this seems a bit tricky with minimal locking and synchronous I/O...
worker * w;
worker * finished = NULL;
SLIST_FOREACH(w, &worker_head, next) {
......@@ -269,7 +290,9 @@ static void * socket_write_thr(void * a) {
} else { // not enabled, awaiting destruction ... check for thread still alive
int ret = pthread_kill(w->thr, 0);
if(ret == ESRCH) { // free
if(ret == ESRCH) { // free and join it
//printf("joining\n");
pthread_join(w->thr, NULL);
finished = w;
SLIST_REMOVE(&worker_head, w, worker, next);
break;
......@@ -277,6 +300,7 @@ static void * socket_write_thr(void * a) {
}
}
// this is because we can't continue processing the linked list if we free one of its item
if(finished != NULL) {
free(finished);
willwait = false;
......@@ -284,7 +308,10 @@ static void * socket_write_thr(void * a) {
continue;
}
// podívám se na globální send_cptr, zapisuju metadata dokud sdr_cptr > send_cptr, send_cptr++
// Now take care about metadada (spectrum, histogram).
// We check the global send_cptr, and then write metadata until sdr_cptr > send_cptr.
pthread_mutex_lock(&datamutex);
int _sdr_cptr = sdr_cptr;
int _send_cptr = send_cptr;
......@@ -346,7 +373,7 @@ static void * socket_write_thr(void * a) {
pthread_mutex_unlock(&llmutex);
// pokud jsem nic nezapsal, čekám na cond
// If we have written nothing, wait.
if(willwait) {
poll(NULL, 0, 10);
}
......
......@@ -8,21 +8,24 @@
void network_listener(char*, char*);
// One chainlink of a linked list describing subscribed xlaters (tcp_cli_t.req_frames_head)
typedef struct req_frames {
int wid;
sample_type sampletype;
SLIST_ENTRY(req_frames) next;
} req_frames;
// One chainlink of a linked list with info about TCP client.
typedef struct tcp_cli_t {
int fd;
pthread_t thr;
bool spectrum;
bool histo;
int fd; // socket fd
pthread_t thr; // client_read_thr
bool spectrum; // do we send spectrum measurements to this client?
bool histo; // do we send histogram measurements to this client?
SLIST_HEAD(req_frames_head, req_frames) req_frames_head;
SLIST_HEAD(req_frames_head, req_frames) req_frames_head; // linked list of subscribed xlaters
SLIST_ENTRY(tcp_cli_t) next;
SLIST_ENTRY(tcp_cli_t) next; // queue(3) next item
} tcp_cli_t;
#endif
......@@ -10,21 +10,59 @@ typedef struct worker_packet {
char * data;
} worker_packet;
/* One chainlink of a linked list with all workers we have spawned.
* Currently we have only xlater_worker, but the goal was to have workers
* doing other things (okay, maybe a special worker that uses FFT overlap-add
* to accelerate convolution).
*/
typedef struct worker {
int wid;
bool enabled;
pthread_t thr;
int wid; // worker id
bool enabled; // set to false to make the worker gracefully exit
pthread_t thr; // (xlate_)worker_thr
float rotate;
int decim;
/* The worker reads SDR packets from sdr_inbuf and writes them to outbuf.
* Both are ring-buffers accessed % BUFSIZE
* send_cptr and last_written are used to show where in the stream we are.
* send_cptr - ID of the last frame which has been sent to all clients.
* so all frames with lower ID are free
* last_written - ID of the last frame the worker has processed. So all frames
* between send_cptr+1 and last_written are ready to be sent to TCP clients.
-+------+------+------+------+------+------+------+------+-
...| | | | | | | | |... %BUFSIZE
-+------+------+------+------+------+------+------+------+-
free frames ^ ready frames ^ future frames
send_cptr last_written
*/
int32_t last_written;
int32_t send_cptr;
/* Changing taps at runtime:
- lock datamutex
- malloc newtaps, memcpy
- set newtapslen
- unlock datamutex
- both old taps and newtaps will be freed by the worker
*/
float * taps;
float * newtaps;
float maxval;
int tapslen;
float maxval; // maximum value of a sample the worker can produce given it is fed with samples
// from range [-1, 1]. Usually about 1.5. Used when scaling samples to integer types.
// The value is set by calc_max_amplitude by create_xlate_worker or by the xlater itself
// when taps are changed.
int tapslen; // Ntaps (i.e. bytes/sizeof(float32))
int newtapslen;
// Maximum size of one filtered and decimated frame.
// It can actually be one sample shorter if SDRPACKETSIZE is not divisible by decimation.
size_t maxoutsize;
int32_t remoteid;
int32_t remoteid; // client-set ID (probably not needed here)
worker_packet outbuf[BUFSIZE];
SLIST_ENTRY(worker) next;
} worker;
......
......@@ -55,6 +55,7 @@ float * get_complex_taps(float * taps, int tapslen, float rotate) {
return ctaps;
}
/* history .. the frame it should start with (or -1 for current frame) */
worker * create_xlate_worker(float rotate, int decim, int history, float * taps, int tapslen) {
worker * w = calloc(1, sizeof(worker));
......@@ -110,6 +111,14 @@ void * xlate_worker_thr(void *ptr) {
int32_t mypos = ctx->last_written + 1;
int fir_offset = 0;
/* To avoid hassle with complicated FIR evaluation, we allocate *alldata bigger
* and copy the end of previous frame to the beginning of it, so we can compute
* the filter simply as sum(x_{n-j}*h_{j}) and don't care about overflows.
*
* The interesting fact is that if SDRPACKETSIZE is not divisible by decimation,
* the filtered frames will have different lengths.
*/
float * alldata = calloc(sizeof(float), (SDRPACKETSIZE + MAXTAPS) * COMPLEX);
float * firout = calloc(1, ctx->maxoutsize);
lv_32fc_t phase_inc, phase;
......@@ -133,6 +142,7 @@ void * xlate_worker_thr(void *ptr) {
int outsample = 0;
int i;
// evaluate the filter
for(i = fir_offset; i<SDRPACKETSIZE; i+=ctx->decim) {
lv_32fc_t* dst = (lv_32fc_t*) (firout + outsample*COMPLEX);
volk_32fc_x2_dot_prod_32fc(dst,
......@@ -140,10 +150,14 @@ void * xlate_worker_thr(void *ptr) {
(lv_32fc_t*)(ctx->taps), ctx->tapslen); // filter
outsample++;
}
// rotator