Spectrum sort

From ActiveArchives
Jump to: navigation, search

Code used in Erkki Kurenniemi: In 2048 and Dataradio

All code on this page may be considered public domain.

DataradioSpectrumSortPlayers.png

Contents

Usage

# ./spectrumsortpipeline input.wav output.wav
# Tweak interval & bands (& freq) settings in call to gstspectrum
./gstspectrum $1 --interval 0.1 \
| ./selectmaxrange \
| ./sortmaxrange \
| ./compactinout \
| ./editwithsox $2 \
| bash

Data pipeline

DataradioSpectrumSortPipeline.png

gstspectrum

#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
import sys, argparse
 
parser = argparse.ArgumentParser(description="Dumps spectral analysis of audio using gstreamer")
parser.add_argument('--freq', default=44100, type=int, help="audio frequency")
parser.add_argument('--interval', default=0.1, type=float, help="interval duration (in seconds)")
parser.add_argument('--bands', default=128, type=int, help="number of bands to split the spectrum")
parser.add_argument('path', help="audio filename to analyze (wav format only)")
 
args = parser.parse_args()
inpath = args.path
AUDIOFREQ = args.freq          # MAX FREQ OF ANALYSIS
INTERVAL_SECS = args.interval
INTERVAL = int(INTERVAL_SECS * 10**9) # DURATION OF "FRAMES" in nanoseconds / audio chunks to process
BANDS = args.bands # 128                 # SPLIT THE SPECTRUM UP INTO THIS MANY BANDS
NANOS = float(10**9)
GST_MESSAGE_EOS = (1 << 0)   # couldn't find this in gst package (wtf!)
 
import gst, pygst, struct, gobject
 
# alternative pipeline for live analysis
# listener_desc = 'autoaudiosrc ! spectrum interval={1} bands={2} ! fakesink'
listener_desc = 'filesrc location="{0}" ! wavparse ! spectrum interval={1} bands={2} ! fakesink'
listener_desc = listener_desc.format(inpath, INTERVAL, BANDS)
 
# (pmin, pmax) = (None, None)
def listener_on_message (bus, message, data):
    global pmin, pmax
    s = message.structure
    if s:
        msgname = s.get_name()
        if msgname == "spectrum":
            secs = s['timestamp'] / NANOS
#            print ("%0.1f-%0.1f\t"%(secs, secs + INTERVAL_SECS)) + ("\t".join([str(x) for x in s['magnitude']]))
            print '"{0}",{1:0.3f},{2:0.3f},'.format(args.path, secs, secs + INTERVAL_SECS) + (",".join(["{0:0.3f}".format(x) for x in s['magnitude']]))
    else:
        # print dir(message), message, message.type
        if message.type & GST_MESSAGE_EOS:
            listener.set_state(gst.STATE_NULL)
            sys.exit(0)
    return True
 
freqs = [] 
for i in range(BANDS):
    freqs.append( ((AUDIOFREQ / 2.0) * i + AUDIOFREQ / 4.0) / BANDS )
 
# SIMPLE HEADERS
#print "source\tin\tout\t", "\t".join([str(x) for x in freqs])
 
# RANGE HEADERS
labels = []
for i in range(BANDS):
    if i == 0:
        fromfreq = 0.0
    else:
        fromfreq = freqs[i-1]
    labels.append('"({0:0.0f}-{1:0.0f})"'.format(fromfreq, freqs[i]))
print '"source","in","out",'+(",".join(labels))
 
listener = gst.parse_launch(listener_desc)
listener.get_bus().add_watch(listener_on_message, None)
mainloop = gobject.MainLoop()
listener.set_state(gst.STATE_PLAYING)
 
try:
    mainloop.run()
except: # an interruption from Ctrl-C
    pass

selectmaxrange

#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
import csv, sys
 
 
rows = csv.reader(sys.stdin)
outrows = csv.writer(sys.stdout)
keys = None
for row in rows:
    if keys == None:
        keys = row
        outkeys = [x for x in keys if not x.startswith("(")]
        outkeys.append("maxrange")
        outrows.writerow(outkeys)
        continue
 
    maxvalue = None
    maxkey = None
    outvals = []
    for key, value in zip(keys, row):
        if not key.startswith("("):
            outvals.append(value)
        else:
            value = float(value)
            if maxvalue == None or value > maxvalue:
                maxvalue = value
                maxkey = key
    outvals.append(maxkey)
    outrows.writerow(outvals)

sortmaxrange

#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
import csv, sys
 
rows = []
keys = None
 
for row in csv.reader(sys.stdin):
    if keys == None:
        keys = row
    else:
        freqrange = None
        for key,value in zip(keys, row):
            if (key == "maxrange"):
                freqrange = [float(x) for x in value.strip("()").split("-")]
        rows.append((freqrange, row))
 
rows.sort()
outrows = csv.writer(sys.stdout)
outrows.writerow(keys)
for freqrange, row in rows:
    outrows.writerow(row)

compactinout

#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
import sys, csv
 
"""
join contiguous ranges from in -- out
if any other column's values change, consider distinct (do not join / break)
"""
 
allrows = []
keys = None
for row in csv.reader(sys.stdin):
    if keys == None:
        keys = row
    else:
        tin, tout = None, None
        othercols = []
        for key, value in zip(keys, row):
            if key == "in":
                tin = float(value)
            elif key == "out":
                tout = float(value)
            else:
                othercols.append(value)
        allrows.append((tin, tout, othercols))
 
outrows = csv.writer(sys.stdout)
outrows.writerow(keys)
curstart = None
for i, (start, end, row) in enumerate(allrows):
    if curstart == None:
        curstart = start
    next_start = None
    if (i+1) < len(allrows):
        next_start = allrows[i+1][0]
        next_row = allrows[i+1][2]
    if next_start and (next_start == end) and (next_row == row):
        continue
    else:
        # print "{0}-{1}\t{2}".format(curstart, end, freqs)
        outvals = []
        rowvals = list(row)
        rowvals.reverse()
        for key in keys:
            if (key == "in"):
                outvals.append(curstart)
                # rowvals.pop()
            elif (key == "out"):
                outvals.append(end)
                # rowvals.pop()
            else:
                outvals.append(rowvals.pop())
        outrows.writerow(outvals)
        curstart = None

editwithsox

#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
import sys, csv
 
"""
Reads stdin as a CSV with at least the columns "source", "in" and "out"
"""
 
try:
    outpath = sys.argv[1]
except IndexError:
    outpath = "output.wav"
 
# wav to raw: sox input.wav --rate 44100 --bits 16 --channels 2 --encoding signed-integer tmp.raw
# raw to wav: sox --rate 44100 --bits 16 --channels 2 --encoding signed-integer tmp.raw output.wav
 
print """if [ -f tmp.raw ]; then rm tmp.raw; fi;"""
lastoutpath = None
for row in csv.DictReader(sys.stdin):
    if 'output' in row:
        outpath = row['output']
        if outpath != lastoutpath:
            if lastoutpath:
                # tmp => lastoutput
                print """sox --rate 44100 --bits 16 --channels 1 --encoding signed-integer tmp.raw "{0}" """.format(lastoutpath)
            # reset tmp.raw
            print """if [ -f tmp.raw ]; then rm tmp.raw; fi;"""
 
        print 'sox "{0}" --rate 44100 --bits 16 --channels 1 -t raw - trim {1} ={2} >> tmp.raw'.format(row['source'], row['in'], row['out'])
        lastoutpath = outpath
    else:
        print 'sox "{0}" --rate 44100 --bits 16 --channels 1 -t raw - trim {1} ={2} >> tmp.raw'.format(row['source'], row['in'], row['out'])
        lastoutpath = outpath
 
print """sox --rate 44100 --bits 16 --channels 1 --encoding signed-integer tmp.raw "{0}" """.format(outpath)
print """rm tmp.raw"""
Personal tools
Namespaces

Variants
Actions
Navigation
Toolbox