ROS1?rosbag的詳細(xì)使用并且使用python合并bag包的方法
在使用ros的時(shí)候經(jīng)常會用到rosbag來錄制或者回放算法,是個(gè)非常有用的工具。
rosbag 命令列表
| 命令 | 作用 |
|---|---|
| record | 錄制一個(gè)包,并且指定topic |
| info | 總結(jié)一個(gè)包的詳細(xì)信息 |
| play | 回放一個(gè)或者多個(gè)包,并且可以指定topic |
| check | 確定一個(gè)包是否可以在當(dāng)前系統(tǒng)中播放,或者是否可以遷移 |
| fix | 修復(fù)一個(gè)包使得它能在當(dāng)前系統(tǒng)中播放 |
| filter | 通過python腳本轉(zhuǎn)換包內(nèi)容 |
| compress | 壓縮包 |
| decompress | 解壓縮包 |
| reindex | 重新索引一個(gè)或者多個(gè)損壞的包 |
record 命令使用
record <topic-names> # 指定一個(gè)或者多個(gè)topic來錄制
$ rosbag record body_status gnss_imu
record -a, --all # 錄制所有topic
$ rosbag record -a
record -e, --regex # 錄制使用正則表達(dá)式匹配的topic
$ rosbag record -e "/(.*)_log/point"
rocord -p, -publish # (Melodic 新特性)當(dāng)開始錄制包時(shí),發(fā)布一個(gè)topic=”begin_write“
$ rosbag record -p
record -x, --exclude # 錄制時(shí)排除正則表達(dá)式匹配的topic
$ rosbag record -e "/(.*)_log/point" -x "(.*)/point"
record -q, --quiet # 錄制時(shí)抑制指定topic的log輸出
$ rosbag record -q /body_status
record -d, --duration # 指定topic錄制最大時(shí)常
$ rosbag record -d 30 /body_status $ rosbag record -d 3m /body_status $ rosbag record -d 3h /body_status
record -o, --output-prefix # 輸出包名前加上前綴
$ rosbag record -o output /topic_name /topic_name2
record -O, --output-name # 指定輸出包名
$ rosbag record -O output.bag /topic_name /topic_name2
record --split # 每隔設(shè)定的時(shí)常或者大小分割包錄制
$ rosbag record --split --size=1024 /topic_name $ rosbag record --split --duration=30 /topic_name $ rosbag record --split --duration=5m /topic_name $ rosbag record --split --duration=2h /topic_name
record --max-splits # (Kinetic 新特性)分割包的最大個(gè)數(shù)
$ rosbag record --split --max-splits 3 --duration=30 /topic_name
record -b, --buffsize # 使用內(nèi)部緩沖區(qū)(默認(rèn)值:256MB,無限為0)
$ rosbag record -b 1024 /topic_name
record --chunksize # 開辟緩沖區(qū),比buffsize更為先進(jìn)(默認(rèn)值: 768KB,無限為0)
$ rosbag record --chunksize=1024 /topic_name
record -l, --limit # 在topic中只錄制指定數(shù)量的消息
$ rosbag record -l 1000 /topic_name
record --node # 記錄指定節(jié)點(diǎn)訂閱的所有消息
$ rosbag record --node=/node
record -j, --bz2 # 設(shè)置錄制以bz2格式壓縮
$ rosbag record -j /topic_name
record --lz4 # 設(shè)置錄制以lz4格式壓縮
$ rosbag record --lz4 /topic_name
record --tcpnodelay# 訂閱主題時(shí)使用TCP_NODELAY傳輸 參考資料record --udp# 訂閱主題時(shí)使用UDP傳輸
info 指令使用
info <bag-files> # 查看一個(gè)或者多個(gè)包的詳細(xì)信息
$ rosbag info test.bag
info -y, --yaml # 以yaml格式輸出一個(gè)或者多個(gè)包的詳細(xì)信息
$ rosbag info -y test.bag
info -k, --key # 查看以yaml格式輸出,包內(nèi)key值對應(yīng)的數(shù)據(jù)
$ rosbag info -y -k topics test.bag
play 指令使用
play <bag_name> # 播放一個(gè)或者多個(gè)數(shù)據(jù)包
$ rosbag play test.bag
play -p, --prefix # 為所有輸出主題添加前綴
$ rosbag play -p test test.bag
play -q, --quiet # 抑制控制臺輸出
$ rosbag play -q test.bag
play -i, --immediate # 播放包不等待
$ rosbag play -i test.bag
play --pause # 開始播放包時(shí)暫停
$ rosbag play --pause test.bag
play --queue # 播放包時(shí)以設(shè)置的隊(duì)列大小播放 默認(rèn)值為100
$ rosbag play --queue=1000 test.bag
play --clock # 自動發(fā)布一個(gè)clocktopic
$ rosbag play --clock test.bag
play --hz # 以指定赫茲發(fā)布clock topic
$ rosbag play --hz=1 --clock test.bag
play -d, --delay # 指定延遲播放的時(shí)間
$ rosbag -d 5 test.bag
play -r, --rate # 指定播放速度
$ rosbag -r 10 test.bag
play -s, --start # 指定開始播放時(shí)間
$ rosbag play -s 5 test.bag
play -u,--duration # 設(shè)定播放時(shí)間
$ rosbag play -u 240 test.bag
play --skip-empty # 設(shè)置跳過沒有消息的數(shù)據(jù)的時(shí)間
$ rosbag play --skip-empty=1 test.bag
play -l, --loop # 循環(huán)播放
$ rosbag play -l test.bag
play -k, --keep-alive # 播放時(shí)保持活躍狀態(tài)
$ rosbag play -k test.bag
play --try-future-version # 即使不知道版本號也可以打開bag
$ rosbag play --try-future-version test.bag
play --topics # 指定對應(yīng)topics進(jìn)行播放
$ rosbag play test.bag --topics /topic1 /topic2
play --pause-topics # 當(dāng)回放數(shù)據(jù)時(shí),暫停指定topics
$ rosbag play test.bag --topics /topic1 /topic2 --pause-topics
play --bags # 指定多個(gè)bags進(jìn)行回放
$ rosbag play --bags=test.bag
play --wait-for-subscribers # 再發(fā)布之前,等待每個(gè)主題至少有一個(gè)訂閱者play --rate-control-topic= # 觀看給定的主題,如果上一次發(fā)布時(shí)間超過了<速率控制最大延遲>,請等待該主題再次發(fā)布后再繼續(xù)播放play --rate-control-max-delay= # 暫停前與<速率控制主題>的最大時(shí)間差 check 指令使用 check <file> # 確定一個(gè)包在當(dāng)前系統(tǒng)中是否可以播放。
$ rosbag check test.bag
check -g, --genrules # 生成一個(gè)名為RULEFILE的遷移規(guī)則文件
$ rosbag check -g diagnostics.bmr test.bag
check -a, --append # 加載后附加到現(xiàn)有數(shù)據(jù)遷移規(guī)則文件的末尾。
$ rosbag check -a -g diagnostics.bmr test.bag
check -n, --noplugins # 不加載規(guī)則文件
$ rosbag check -n test.bag
fix 指令使用
fix <in-bag> <out-bag> [rules.bmr] # 使用規(guī)則文件修復(fù)數(shù)據(jù)包
$ rosbag fix old.bag repaired.bag myrules.bmr
fix -n, --noplugins # 修復(fù)數(shù)據(jù)包不帶規(guī)則文件
$ rosbag fix -n old.bag repaired.bag
filter 指令使用
filter <in-bag> <out-bag> <expression> # 使用給定的Python表達(dá)式轉(zhuǎn)換數(shù)據(jù)文件。
$ rosbag filter my.bag only-tf.bag "topic == '/tf'"
compress 指令使用
compress <bag-files> # 使用BZ2格式壓縮包
$ rosbag compress *.bag
compress --output-dir=DIR # 指定路徑輸出文件
$ rosbag compress *.bag --output-dir=compressed
compress -f, --force # 如果已經(jīng)存在包,則強(qiáng)制覆蓋
$ rosbag compress -f *.bag
compress -q, --quiet # 抑制非關(guān)鍵信息
$ rosbag compress -q *.bag
compress -j, --bz2 # 使用bz2格式壓縮包
$ rosbag compress -j *.bag
compress --lz4 # 使用lz4格式壓縮包
$ rosbag compress --lz4 *.bag
decompress 指令使用 decompress <bag-files> # 解壓縮包
$ rosbag decompress *.bag
decompress --output-dir=DIR # 指定解壓縮的路徑
$ rosbag decompress --output-dir=uncompressed *.bag
decompress -f, --force # 如果已經(jīng)存在包,則強(qiáng)制覆蓋
$ rosbag decompress -f *.bag
decompress -q, --quiet # 抑制非關(guān)鍵信息
$ rosbag decompress -q *.bag
reindex 指令使用 reindex <bag-files> # 修復(fù)壞包
$ rosbag reindex *.bag
reindex --output-dir=DIR # 指定路徑
$ rosbag reindex --output-dir=reindexed *.bag
reindex -f, --force # 如果已經(jīng)存在包,則強(qiáng)制覆蓋
$ rosbag reindex -f *.bag
reindex -q, --quiet # 抑制非關(guān)鍵信息
$ rosbag reindex -q *.bag
2. 使用Python合并包代碼
#! /usr/bin/env python3
import os
import time
import argparse
from rosbag import Bag, Compression
def parse_compression(compression):
if compression == "none" or compression == "NONE":
compression = Compression.NONE
elif compression == "bz2":
compression = Compression.BZ2
elif compression == "lz4":
compression = Compression.LZ4
return compression
def get_files_list(arg_input, arg_select):
files = None
if " " in arg_input:
files = arg_input.split(" ")
elif "," in arg_input:
files = arg_input.split(",")
elif os.path.exists(args.input) or "*" in args.input:
rfind_index = arg_input.rfind("/")
dir_path = arg_input[:rfind_index]
files = os.listdir(dir_path)
files.sort() # 名稱排序
files = [dir_path + "/" + file for file in files]
if arg_select:
files = select_bags(files)
print("bags list:")
for i in range(len(files)):
print(str(i) + "." + files[i])
return files
def select_bags(files):
for i in range(len(files)):
print(str(i) + "." + files[i])
print("Please input bag numbers to merge. split by , or space", end=":")
s_b = input()
s_b_list = []
if "," in s_b:
s_b_list = s_b.split(",")
elif " " in s_b:
s_b_list = s_b.split(" ")
files = [files[int(x)] for x in s_b_list]
return files
def show_process_bar(total, i, start):
a = "*" * i
b = "." * (total - i)
c = (i / total) * 100
dur = time.perf_counter() - start
print("\r{:^3.0f}%[{}->{}]{:.2f}s".format(c,a,b,dur), end="")
def merge_bags(args):
print("start merge bags.")
compression = parse_compression(args.compression)
print(f"bag's compression mode is {compression}.")
files = get_files_list(args.input, args.select)
with Bag(args.output, "w", compression=compression) as o:
start = time.perf_counter()
for i in range(len(files)):
show_process_bar(len(files), i+1, start)
with Bag(files[i], "r") as ib:
for topic, msg, t in ib:
o.write(topic, msg, t)
show_process_bar(len(files), i+1, start)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Merge one or more bag files to one file.")
parser.add_argument("-o", "--output", help="Output bag's file path.",
default="output.bag", required=True)
parser.add_argument("-i", "--input", help="Input files bags name or path, split by , .",
required=True)
parser.add_argument("-c", "--compression", help="Compress the bag by bz2 or lz4",
default="lz4", choices=["none", "lz4", "bz2"])
parser.add_argument("-s", "--select", help="Select bags to merge.",
default=False)
parser.add_argument("-v", "--verbose", help="Show the verbose msg.")
args = parser.parse_args()
merge_bags(args) Reference:
http://wiki.ros.org/rosbag/Commandline
到此這篇關(guān)于ROS1 rosbag的詳細(xì)使用,并且使用python來合并bag包的文章就介紹到這了,更多相關(guān)ROS1 rosbag使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python虛擬機(jī)之描述器實(shí)現(xiàn)原理與源碼分析
在本篇文章當(dāng)中主要給大家介紹描述器背后的實(shí)現(xiàn)原理,通過分析?cpython對應(yīng)的源代碼了解與描述器相關(guān)的字節(jié)碼的指令,我們就可以真正了解到描述器背后的原理,需要的朋友可以參考下2023-05-05
Python OpenCV高斯金字塔與拉普拉斯金字塔的實(shí)現(xiàn)
這篇文章主要介紹了Python OpenCV高斯金字塔與拉普拉斯金字塔的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
如何基于OpenCV&Python實(shí)現(xiàn)霍夫變換圓形檢測
最近開始學(xué)習(xí)opencv,想檢測圖片上的圓環(huán),發(fā)現(xiàn)霍夫變換可以做這樣的效果出來,于是嘗試用霍夫變換做了下圓環(huán)檢測,這篇文章主要給大家介紹了基于OpenCV&Python實(shí)現(xiàn)霍夫變換圓形檢測的相關(guān)資料,需要的朋友可以參考下2021-08-08
Python從數(shù)據(jù)庫讀取大量數(shù)據(jù)批量寫入文件的方法
今天小編就為大家分享一篇Python從數(shù)據(jù)庫讀取大量數(shù)據(jù)批量寫入文件的方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-12-12
python基礎(chǔ)之while循環(huán)、for循環(huán)詳解及舉例
所謂循環(huán)結(jié)構(gòu)就是程序中控制某條或某些指令重復(fù)執(zhí)行的結(jié)構(gòu),下面這篇文章主要給大家介紹了關(guān)于python基礎(chǔ)之while循環(huán)、for循環(huán)的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-04-04
Python 處理數(shù)據(jù)的實(shí)例詳解
這篇文章主要介紹了Python 處理數(shù)據(jù)的實(shí)例詳解的相關(guān)資料,這里主要介紹Python 常用的基礎(chǔ)知識并附實(shí)例,需要的朋友可以參考下2017-08-08
flask之郵件發(fā)送的實(shí)現(xiàn)示例
Flask-Mail是一個(gè)處理電子郵件發(fā)送的擴(kuò)展,它提供了簡單且易于使用的API,可以方便地發(fā)送電子郵件,本文就來介紹一下flask之郵件發(fā)送的實(shí)現(xiàn)示例,感興趣的可以了解一下2023-12-12
opencv-python圖像配準(zhǔn)(匹配和疊加)的實(shí)現(xiàn)
圖像配準(zhǔn)需是指對不同條件下得到的兩幅或多幅圖像進(jìn)行匹配、疊加的過程。本文詳細(xì)的介紹了如何使用,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-06-06
基于python進(jìn)行桶排序與基數(shù)排序的總結(jié)
今天小編就為大家分享一篇基于python進(jìn)行桶排序與基數(shù)排序的總結(jié),具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-05-05

